diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 3142720c..6546b949 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -32,6 +32,7 @@ import hmac import hashlib import wsproto from uuid import uuid4 +from functools import partial import asks import tractor @@ -443,6 +444,28 @@ async def open_symbol_search( await stream.send(await client.search_symbols(pattern)) log.info('Kucoin symbol search opened') +@acm +async def open_ping_task(ws: wsproto.WSConnection, ping_interval, connect_id): + ''' + Spawn a non-blocking task that pings the ws + server every ping_interval so Kucoin doesn't drop + our connection + + ''' + async with trio.open_nursery() as n: + # TODO: cache this task so it's only called once + async def ping_server(): + while True: + await trio.sleep((ping_interval - 1000) / 1000) + await ws.send_msg({'id': connect_id, 'type': 'ping'}) + + log.info(f'Starting ping task for kucoin ws connection') + n.start_soon(ping_server) + + yield ws + + n.cancel_scope.cancel() + async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -457,121 +480,89 @@ async def stream_quotes( Where the rubber hits the road baby ''' - connect_id = str(uuid4()) - async with open_cached_client('kucoin') as client: - log.info('Starting up quote stream') - # loop through symbols and sub to feedz - for sym in symbols: - token, ping_interval = await client._get_ws_token() - pairs = await client.cache_pairs() - pair: KucoinMktPair = pairs[sym] - kucoin_sym = pair.symbol + token, ping_interval = await client._get_ws_token() + connect_id = str(uuid4()) + pairs = await client.cache_pairs() - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': { - 'asset_type': 'crypto', - 'price_tick_size': float(pair.baseIncrement), - 'lot_tick_size': float(pair.baseMinSize), - }, - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, - }, - } + # open ping task + async with ( + open_autorecon_ws( + f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]' + ) as ws, + open_ping_task(ws, ping_interval, connect_id) as ws, + ): + log.info('Starting up quote stream') + # loop through symbols and sub to feedz + for sym in symbols: + pair: KucoinMktPair = pairs[sym] + kucoin_sym = pair.symbol - @acm - async def subscribe(ws: wsproto.WSConnection): + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': { + 'asset_type': 'crypto', + 'price_tick_size': float(pair.baseIncrement), + 'lot_tick_size': float(pair.baseMinSize), + }, + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, + } + } - @acm - async def open_ping_task(ws: wsproto.WSConnection): - ''' - Spawn a non-blocking task that pings the ws - server every ping_interval so Kucoin doesn't drop - our connection - ''' - async with trio.open_nursery() as n: - # TODO: cache this task so it's only called once - async def ping_server(): - while True: - await trio.sleep((ping_interval - 1000) / 1000) - await ws.send_msg({'id': connect_id, 'type': 'ping'}) - - log.info(f'Starting ping task for {sym}') - n.start_soon(ping_server) - - yield ws - - n.cancel_scope.cancel() - - # Spawn the ping task here - async with open_ping_task(ws) as ws: - tasks = [] - tasks.append(make_sub(kucoin_sym, connect_id, level='l3')) - tasks.append(make_sub(kucoin_sym, connect_id, level='l1')) - - for task in tasks: - log.info( - f'Subscribing to {task["topic"]} feed for {sym}') - await ws.send_msg(task) - - yield - - # unsub - if ws.connected(): - log.info(f'Unsubscribing to {kucoin_sym} feed') - await ws.send_msg( - { - 'id': connect_id, - 'type': 'unsubscribe', - 'topic': f'/market/ticker:{sym}', - 'privateChannel': False, - 'response': True, - } - ) - - async with ( - open_autorecon_ws( - f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]', - fixture=subscribe, - ) as ws, - stream_messages(ws, sym) as msg_gen, - ): - typ, quote = await anext(msg_gen) - - while typ != 'trade': - # take care to not unblock here until we get a real trade quote + async with ( + subscribe(ws, connect_id, kucoin_sym), + stream_messages(ws, sym) as msg_gen, + ): typ, quote = await anext(msg_gen) + while typ != 'trade': + # take care to not unblock here until we get a real trade quote + typ, quote = await anext(msg_gen) - task_status.started((init_msgs, quote)) - feed_is_live.set() + task_status.started((init_msgs, quote)) + feed_is_live.set() - async for typ, msg in msg_gen: - await send_chan.send({sym: msg}) + async for typ, msg in msg_gen: + await send_chan.send({sym: msg}) +@acm +async def subscribe(ws: wsproto.WSConnection, connect_id, sym): + # breakpoint() + # level 2 sub + await ws.send_msg({ + 'id': connect_id, + 'type': 'subscribe', + 'topic': f'/spotMarket/level2Depth5:{sym}', + 'privateChannel': False, + 'response': True, + }) -def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool] | None: - match level: - case 'l1': - return { + # watch trades + await ws.send_msg({ + 'id': connect_id, + 'type': 'subscribe', + 'topic': f'/market/ticker:{sym}', + 'privateChannel': False, + 'response': True, + }) + + yield + + # unsub + if ws.connected(): + log.info(f'Unsubscribing to {syn} feed') + await ws.send_msg( + { 'id': connect_id, - 'type': 'subscribe', - 'topic': f'/spotMarket/level2Depth5:{sym}', - 'privateChannel': False, - 'response': True, - } - - case 'l3': - return { - 'id': connect_id, - 'type': 'subscribe', + 'type': 'unsubscribe', 'topic': f'/market/ticker:{sym}', 'privateChannel': False, 'response': True, } + ) @trio_async_generator