Compare commits

...

2 Commits

Author SHA1 Message Date
Guillermo Rodriguez e56d065dbc
Add fill_bars function to kraken 2021-01-04 08:30:05 -03:00
Guillermo Rodriguez b1fd986a3a
Change kraken.stream_quotes to use tractor.stream api 2021-01-02 12:35:38 -03:00
1 changed files with 188 additions and 141 deletions

View File

@ -17,9 +17,11 @@
""" """
Kraken backend. Kraken backend.
""" """
import sys
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict, field from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import json import json
import time import time
@ -290,170 +292,215 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
} }
# @tractor.msg.pub async def fill_bars(
async def stream_quotes( first_bars,
# get_topics: Callable, shm,
shm_token: Tuple[str, str, List[tuple]], symbol: str,
symbols: List[str] = ['XBTUSD', 'XMRUSD'], count: int = 75
# These are the symbols not expected by the ws api
# they are looked up inside this routine.
sub_type: str = 'ohlc',
loglevel: str = None,
# compat with eventual ``tractor.msg.pub``
topics: Optional[List[str]] = None,
) -> None: ) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``.
``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
"""
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
ws_pairs = {}
async with get_client() as client: async with get_client() as client:
# keep client cached for real-time section next_dt = first_bars[0][1]
for sym in symbols: i = 0
ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] while i < count:
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
writer_exists = get_shm_token(shm_token['shm_name'])
symbol = symbols[0]
if not writer_exists:
shm = attach_shm_array(
token=shm_token,
# we are writer
readonly=False,
)
bars = await client.bars(symbol=symbol)
shm.push(bars)
shm_token = shm.token
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
yield shm_token, not writer_exists
while True:
try: try:
async with trio_websocket.open_websocket_url( bars_array = await client.bars(
'wss://ws.kraken.com/', symbol=symbol,
) as ws: since=arrow.get(next_dt).floor('minute')
.shift(minutes=-720).timestamp
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars_array[0][1]
# XXX: setup subs await trio.sleep(5)
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
# TODO: we want to eventually allow unsubs which should except BaseException as e:
# be completely fine to request from a separate task log.exception(e)
# since internally the ws methods appear to be FIFO await tractor.breakpoint()
# locked.
await ws.send_message(json.dumps(ohlc_sub))
# trade data (aka L1)
l1_sub = make_sub(
list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10}
) _local_buffer_writers = {}
await ws.send_message(json.dumps(l1_sub))
async def recv():
return json.loads(await ws.get_message())
# pull a first quote and deliver @asynccontextmanager
msg_gen = recv_msg(recv) async def activate_writer(key: str) -> (bool, trio.Nursery):
typ, ohlc_last = await msg_gen.__anext__() try:
writer_already_exists = _local_buffer_writers.get(key, False)
topic, quote = normalize(ohlc_last) if not writer_already_exists:
_local_buffer_writers[key] = True
# packetize as {topic: quote} async with trio.open_nursery() as n:
yield {topic: quote} yield writer_already_exists, n
else:
yield writer_already_exists, None
finally:
_local_buffer_writers.pop(key, None)
# keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime
# start streaming @tractor.stream
async for typ, ohlc in msg_gen: async def stream_quotes(
ctx: tractor.Context,
symbols: List[str],
shm_token: Tuple[str, str, List[tuple]],
loglevel: str = None,
# compat for @tractor.msg.pub
topics: Any = None
) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
ws_pairs = {}
async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
async with get_client() as client:
# keep client cached for real-time section
for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
if typ == 'ohlc': symbol = symbols[0]
# TODO: can get rid of all this by using if not writer_already_exists:
# ``trades`` subscription... shm = attach_shm_array(
token=shm_token,
# we are writer
readonly=False,
)
bars = await client.bars(symbol=symbol)
# generate tick values to match time & sales pane: shm.push(bars)
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m shm_token = shm.token
volume = ohlc.volume
# new interval ln.start_soon(fill_bars, bars, shm, symbol)
if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
last = ohlc.close times = shm.array['time']
if tick_volume: delay_s = times[-1] - times[times != times[-1]][-1]
ohlc.ticks.append({ subscribe_ohlc_for_increment(shm, delay_s)
'type': 'trade',
'price': last, # pass back token, and bool, signalling if we're the writer
'size': tick_volume, await ctx.send_yield((shm_token, not writer_already_exists))
})
topic, quote = normalize(ohlc) while True:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
) as ws:
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
# if we are the lone tick writer start writing # TODO: we want to eventually allow unsubs which should
# the buffer with appropriate trade data # be completely fine to request from a separate task
if not writer_exists: # since internally the ws methods appear to be FIFO
# update last entry # locked.
# benchmarked in the 4-5 us range await ws.send_message(json.dumps(ohlc_sub))
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick_volume
if v == 0 and new_v: # trade data (aka L1)
# no trades for this bar yet so the open l1_sub = make_sub(
# is also the close/last trade price list(ws_pairs.values()),
o = last {'name': 'spread'} # 'depth': 10}
# write shm )
shm.array[ await ws.send_message(json.dumps(l1_sub))
['open',
'high',
'low',
'close',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
max(high, last),
min(low, last),
last,
ohlc.vwap,
volume,
)
ohlc_last = ohlc
elif typ == 'l1': async def recv():
quote = ohlc return json.loads(await ws.get_message())
topic = quote['symbol']
# XXX: format required by ``tractor.msg.pub`` # pull a first quote and deliver
# requires a ``Dict[topic: str, quote: dict]`` msg_gen = recv_msg(recv)
yield {topic: quote} typ, ohlc_last = await msg_gen.__anext__()
topic, quote = normalize(ohlc_last)
# packetize as {topic: quote}
await ctx.send_yield({topic: quote})
# keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime
# start streaming
async for typ, ohlc in msg_gen:
if typ == 'ohlc':
# TODO: can get rid of all this by using
# ``trades`` subscription...
# generate tick values to match time & sales pane:
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume
# new interval
if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
last = ohlc.close
if tick_volume:
ohlc.ticks.append({
'type': 'trade',
'price': last,
'size': tick_volume,
})
topic, quote = normalize(ohlc)
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_already_exists:
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick_volume
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
# write shm
shm.array[
['open',
'high',
'low',
'close',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
max(high, last),
min(low, last),
last,
ohlc.vwap,
volume,
)
ohlc_last = ohlc
elif typ == 'l1':
quote = ohlc
topic = quote['symbol']
# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
await ctx.send_yield({topic: quote})
except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting")
except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting")