Compare commits

..

No commits in common. "27df649fbfda30bcd6b331400dcfe395a8476000" and "573268bac1ab67dfb5117a26766f9bed88ee717a" have entirely different histories.

2 changed files with 21 additions and 60 deletions

View File

@ -597,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
''' '''
async with ( async with (
httpx.AsyncClient( httpx.AsyncClient(
base_url='https://api.kucoin.com/api', base_url=f'https://api.kucoin.com/api',
) as trio_client, ) as trio_client,
): ):
client = Client(httpx_client=trio_client) client = Client(httpx_client=trio_client)
@ -641,7 +641,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.warning('Starting ping task for kucoin ws connection') log.info('Starting ping task for kucoin ws connection')
n.start_soon(ping_server) n.start_soon(ping_server)
yield yield
@ -653,14 +653,9 @@ async def open_ping_task(
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[ ) -> tuple[MktPair, KucoinMktPair]:
MktPair,
KucoinMktPair,
]:
''' '''
Query for and return both a `piker.accounting.MktPair` and Query for and return a `MktPair` and `KucoinMktPair`.
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
''' '''
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
@ -735,8 +730,6 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}') log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols: for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str) mkt, pair = await get_mkt_info(sym_str)
init_msgs.append( init_msgs.append(
FeedInit(mkt_info=mkt) FeedInit(mkt_info=mkt)
@ -744,11 +737,7 @@ async def stream_quotes(
ws: NoBsWs ws: NoBsWs
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
log.info('API reported ping_interval: {ping_interval}\n') connect_id = str(uuid4())
connect_id: str = str(uuid4())
typ: str
quote: dict
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
( (
@ -762,37 +751,20 @@ async def stream_quotes(
), ),
) as ws, ) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
aclosing( aclosing(stream_messages(ws, sym_str)) as msg_gen,
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
): ):
typ, quote = await anext(iter_quotes) typ, quote = await anext(msg_gen)
# take care to not unblock here until we get a real while typ != 'trade':
# trade quote? # take care to not unblock here until we get a real
# ^TODO, remove this right? # trade quote
# -[ ] what often blocks chart boot/new-feed switching typ, quote = await anext(msg_gen)
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
# XXX NOTE, DO NOT include the `.<backend>` suffix! async for typ, msg in msg_gen:
# OW the sampling loop will not broadcast correctly.. await send_chan.send({sym_str: msg})
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm @acm
@ -847,7 +819,7 @@ async def subscribe(
) )
async def iter_normed_quotes( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
sym: str, sym: str,
@ -878,9 +850,6 @@ async def iter_normed_quotes(
yield 'trade', { yield 'trade', {
'symbol': sym, 'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price, 'last': trade_data.price,
'brokerd_ts': last_trade_ts, 'brokerd_ts': last_trade_ts,
'ticks': [ 'ticks': [
@ -973,7 +942,7 @@ async def open_history_client(
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
log.debug( print(
f'difference in time between load and processing' f'difference in time between load and processing'
f'{inow - times[-1]}' f'{inow - times[-1]}'
) )

View File

@ -653,11 +653,7 @@ class Router(Struct):
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker) book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog( async with self.maybe_open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
@ -720,7 +716,7 @@ class Router(Struct):
subs = self.subscribers[sub_key] subs = self.subscribers[sub_key]
sent_some: bool = False sent_some: bool = False
for client_stream in subs.copy(): for client_stream in subs:
try: try:
await client_stream.send(msg) await client_stream.send(msg)
sent_some = True sent_some = True
@ -1014,14 +1010,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name'] status_msg.src = msg.broker_details['name']
if not status_msg.req: await router.client_broadcast(
# likely some order change state? status_msg.req.symbol,
await tractor.pause() status_msg,
else: )
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if status == 'closed': if status == 'closed':
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')