Add L1 spread streaming to kraken

bar_select
Tyler Goodlet 2020-11-06 11:35:40 -05:00
parent 043bc985df
commit be4a3df7ba
1 changed files with 129 additions and 67 deletions
piker/brokers

View File

@ -176,8 +176,9 @@ class OHLC:
setattr(self, f, val.type(getattr(self, f)))
async def recv_ohlc(recv):
async def recv_msg(recv):
too_slow_count = last_hb = 0
while True:
with trio.move_on_after(1.5) as cs:
msg = await recv()
@ -194,20 +195,50 @@ async def recv_ohlc(recv):
if isinstance(msg, dict):
if msg.get('event') == 'heartbeat':
now = time.time()
delay = now - last_hb
last_hb = now
log.trace(f"Heartbeat after {delay}")
# TODO: hmm i guess we should use this
# for determining when to do connection
# resets eh?
continue
err = msg.get('errorMessage')
if err:
raise BrokerError(err)
else:
chan_id, ohlc_array, chan_name, pair = msg
yield OHLC(chan_id, chan_name, pair, *ohlc_array)
chan_id, *payload_array, chan_name, pair = msg
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
elif 'spread' in chan_name:
bid, ask, ts, bsize, asize = map(float, payload_array[0])
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else:
print(f'UNHANDLED MSG: {msg}')
def normalize(
@ -226,6 +257,21 @@ def normalize(
return topic, quote
def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
"""Create a request subscription packet dict.
https://docs.kraken.com/websockets/#message-subscribe
"""
# eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'pair': pairs,
'event': 'subscribe',
'subscription': data,
}
# @tractor.msg.pub
async def stream_quotes(
# get_topics: Callable,
@ -247,6 +293,7 @@ async def stream_quotes(
ws_pairs = {}
async with get_client() as client:
# keep client cached for real-time section
for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
@ -280,31 +327,36 @@ async def stream_quotes(
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
) as ws:
# setup subs
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
subs = {
'pair': list(ws_pairs.values()),
'event': 'subscribe',
'subscription': {
'name': sub_type,
'interval': 1, # 1 min
# 'name': 'ticker',
# 'name': 'openOrders',
# 'depth': '25',
},
}
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_message(json.dumps(subs))
await ws.send_message(json.dumps(ohlc_sub))
# trade data (aka L1)
l1_sub = make_sub(
list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10}
)
await ws.send_message(json.dumps(l1_sub))
async def recv():
return json.loads(await ws.get_message())
# pull a first quote and deliver
ohlc_gen = recv_ohlc(recv)
ohlc_last = await ohlc_gen.__anext__()
msg_gen = recv_msg(recv)
typ, ohlc_last = await msg_gen.__anext__()
topic, quote = normalize(ohlc_last)
@ -315,12 +367,19 @@ async def stream_quotes(
last_interval_start = ohlc_last.etime
# start streaming
async for ohlc in ohlc_gen:
async for typ, ohlc in msg_gen:
if typ == 'ohlc':
# TODO: can get rid of all this by using
# ``trades`` subscription...
# generate tick values to match time & sales pane:
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume
if ohlc.etime > last_interval_start: # new interval
# new interval
if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime
tick_volume = volume
else:
@ -368,12 +427,15 @@ async def stream_quotes(
ohlc.vwap,
volume,
)
ohlc_last = ohlc
elif typ == 'l1':
quote = ohlc
topic = quote['symbol']
# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
yield {topic: quote}
ohlc_last = ohlc
except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting")