Refactor streaming logic to be less nested and readable

kucoin_backend
jaredgoldman 2023-04-16 10:11:17 -04:00
parent 8403d8a482
commit 9706803220
1 changed files with 91 additions and 100 deletions

View File

@ -32,6 +32,7 @@ import hmac
import hashlib import hashlib
import wsproto import wsproto
from uuid import uuid4 from uuid import uuid4
from functools import partial
import asks import asks
import tractor import tractor
@ -443,6 +444,28 @@ async def open_symbol_search(
await stream.send(await client.search_symbols(pattern)) await stream.send(await client.search_symbols(pattern))
log.info('Kucoin symbol search opened') log.info('Kucoin symbol search opened')
@acm
async def open_ping_task(ws: wsproto.WSConnection, ping_interval, connect_id):
'''
Spawn a non-blocking task that pings the ws
server every ping_interval so Kucoin doesn't drop
our connection
'''
async with trio.open_nursery() as n:
# TODO: cache this task so it's only called once
async def ping_server():
while True:
await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.info(f'Starting ping task for kucoin ws connection')
n.start_soon(ping_server)
yield ws
n.cancel_scope.cancel()
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
@ -457,121 +480,89 @@ async def stream_quotes(
Where the rubber hits the road baby Where the rubber hits the road baby
''' '''
connect_id = str(uuid4())
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
log.info('Starting up quote stream') token, ping_interval = await client._get_ws_token()
# loop through symbols and sub to feedz connect_id = str(uuid4())
for sym in symbols: pairs = await client.cache_pairs()
token, ping_interval = await client._get_ws_token()
pairs = await client.cache_pairs()
pair: KucoinMktPair = pairs[sym]
kucoin_sym = pair.symbol
init_msgs = { # open ping task
# pass back token, and bool, signalling if we're the writer async with (
# and that history has been written open_autorecon_ws(
sym: { f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]'
'symbol_info': { ) as ws,
'asset_type': 'crypto', open_ping_task(ws, ping_interval, connect_id) as ws,
'price_tick_size': float(pair.baseIncrement), ):
'lot_tick_size': float(pair.baseMinSize), log.info('Starting up quote stream')
}, # loop through symbols and sub to feedz
'shm_write_opts': {'sum_tick_vml': False}, for sym in symbols:
'fqsn': sym, pair: KucoinMktPair = pairs[sym]
}, kucoin_sym = pair.symbol
}
@acm init_msgs = {
async def subscribe(ws: wsproto.WSConnection): # pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'crypto',
'price_tick_size': float(pair.baseIncrement),
'lot_tick_size': float(pair.baseMinSize),
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
}
}
@acm
async def open_ping_task(ws: wsproto.WSConnection):
'''
Spawn a non-blocking task that pings the ws
server every ping_interval so Kucoin doesn't drop
our connection
''' async with (
async with trio.open_nursery() as n: subscribe(ws, connect_id, kucoin_sym),
# TODO: cache this task so it's only called once stream_messages(ws, sym) as msg_gen,
async def ping_server(): ):
while True:
await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.info(f'Starting ping task for {sym}')
n.start_soon(ping_server)
yield ws
n.cancel_scope.cancel()
# Spawn the ping task here
async with open_ping_task(ws) as ws:
tasks = []
tasks.append(make_sub(kucoin_sym, connect_id, level='l3'))
tasks.append(make_sub(kucoin_sym, connect_id, level='l1'))
for task in tasks:
log.info(
f'Subscribing to {task["topic"]} feed for {sym}')
await ws.send_msg(task)
yield
# unsub
if ws.connected():
log.info(f'Unsubscribing to {kucoin_sym} feed')
await ws.send_msg(
{
'id': connect_id,
'type': 'unsubscribe',
'topic': f'/market/ticker:{sym}',
'privateChannel': False,
'response': True,
}
)
async with (
open_autorecon_ws(
f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]',
fixture=subscribe,
) as ws,
stream_messages(ws, sym) as msg_gen,
):
typ, quote = await anext(msg_gen)
while typ != 'trade':
# take care to not unblock here until we get a real trade quote
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)
while typ != 'trade':
# take care to not unblock here until we get a real trade quote
typ, quote = await anext(msg_gen)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
async for typ, msg in msg_gen: async for typ, msg in msg_gen:
await send_chan.send({sym: msg}) await send_chan.send({sym: msg})
@acm
async def subscribe(ws: wsproto.WSConnection, connect_id, sym):
# breakpoint()
# level 2 sub
await ws.send_msg({
'id': connect_id,
'type': 'subscribe',
'topic': f'/spotMarket/level2Depth5:{sym}',
'privateChannel': False,
'response': True,
})
def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool] | None: # watch trades
match level: await ws.send_msg({
case 'l1': 'id': connect_id,
return { 'type': 'subscribe',
'topic': f'/market/ticker:{sym}',
'privateChannel': False,
'response': True,
})
yield
# unsub
if ws.connected():
log.info(f'Unsubscribing to {syn} feed')
await ws.send_msg(
{
'id': connect_id, 'id': connect_id,
'type': 'subscribe', 'type': 'unsubscribe',
'topic': f'/spotMarket/level2Depth5:{sym}',
'privateChannel': False,
'response': True,
}
case 'l3':
return {
'id': connect_id,
'type': 'subscribe',
'topic': f'/market/ticker:{sym}', 'topic': f'/market/ticker:{sym}',
'privateChannel': False, 'privateChannel': False,
'response': True, 'response': True,
} }
)
@trio_async_generator @trio_async_generator