Add fill_bars function to kraken

kraken_history
Guillermo Rodriguez 2021-01-02 12:34:17 -03:00
parent b1fd986a3a
commit e56d065dbc
No known key found for this signature in database
GPG Key ID: 3F61096EC7DF75A8
1 changed files with 180 additions and 296 deletions

View File

@ -17,6 +17,8 @@
"""
Kraken backend.
"""
import sys
from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
@ -290,173 +292,53 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
}
# @tractor.msg.pub
#async def stream_quotes(
# # get_topics: Callable,
# shm_token: Tuple[str, str, List[tuple]],
# symbols: List[str] = ['XBTUSD', 'XMRUSD'],
# # These are the symbols not expected by the ws api
# # they are looked up inside this routine.
# sub_type: str = 'ohlc',
# loglevel: str = None,
# # compat with eventual ``tractor.msg.pub``
# topics: Optional[List[str]] = None,
#) -> None:
# """Subscribe for ohlc stream of quotes for ``pairs``.
#
# ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
# """
# # XXX: required to propagate ``tractor`` loglevel to piker logging
# get_console_log(loglevel or tractor.current_actor().loglevel)
#
# ws_pairs = {}
# async with get_client() as client:
#
# # keep client cached for real-time section
# for sym in symbols:
# ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
#
# # maybe load historical ohlcv in to shared mem
# # check if shm has already been created by previous
# # feed initialization
# writer_exists = get_shm_token(shm_token['shm_name'])
#
# symbol = symbols[0]
#
# if not writer_exists:
# shm = attach_shm_array(
# token=shm_token,
# # we are writer
# readonly=False,
# )
# bars = await client.bars(symbol=symbol)
#
# shm.push(bars)
# shm_token = shm.token
#
# times = shm.array['time']
# delay_s = times[-1] - times[times != times[-1]][-1]
# subscribe_ohlc_for_increment(shm, delay_s)
#
# yield shm_token, not writer_exists
#
# while True:
# try:
# async with trio_websocket.open_websocket_url(
# 'wss://ws.kraken.com/',
# ) as ws:
#
# # XXX: setup subs
# # https://docs.kraken.com/websockets/#message-subscribe
# # specific logic for this in kraken's shitty sync client:
# # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
# ohlc_sub = make_sub(
# list(ws_pairs.values()),
# {'name': 'ohlc', 'interval': 1}
# )
#
# # TODO: we want to eventually allow unsubs which should
# # be completely fine to request from a separate task
# # since internally the ws methods appear to be FIFO
# # locked.
# await ws.send_message(json.dumps(ohlc_sub))
#
# # trade data (aka L1)
# l1_sub = make_sub(
# list(ws_pairs.values()),
# {'name': 'spread'} # 'depth': 10}
#
# )
# await ws.send_message(json.dumps(l1_sub))
#
# async def recv():
# return json.loads(await ws.get_message())
#
# # pull a first quote and deliver
# msg_gen = recv_msg(recv)
# typ, ohlc_last = await msg_gen.__anext__()
#
# topic, quote = normalize(ohlc_last)
#
# # packetize as {topic: quote}
# yield {topic: quote}
#
# # keep start of last interval for volume tracking
# last_interval_start = ohlc_last.etime
#
# # start streaming
# async for typ, ohlc in msg_gen:
#
# if typ == 'ohlc':
#
# # TODO: can get rid of all this by using
# # ``trades`` subscription...
#
# # generate tick values to match time & sales pane:
# # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
# volume = ohlc.volume
#
# # new interval
# if ohlc.etime > last_interval_start:
# last_interval_start = ohlc.etime
# tick_volume = volume
# else:
# # this is the tick volume *within the interval*
# tick_volume = volume - ohlc_last.volume
#
# last = ohlc.close
# if tick_volume:
# ohlc.ticks.append({
# 'type': 'trade',
# 'price': last,
# 'size': tick_volume,
# })
#
# topic, quote = normalize(ohlc)
#
# # if we are the lone tick writer start writing
# # the buffer with appropriate trade data
# if not writer_exists:
# # update last entry
# # benchmarked in the 4-5 us range
# o, high, low, v = shm.array[-1][
# ['open', 'high', 'low', 'volume']
# ]
# new_v = tick_volume
#
# if v == 0 and new_v:
# # no trades for this bar yet so the open
# # is also the close/last trade price
# o = last
#
# # write shm
# shm.array[
# ['open',
# 'high',
# 'low',
# 'close',
# 'bar_wap', # in this case vwap of bar
# 'volume']
# ][-1] = (
# o,
# max(high, last),
# min(low, last),
# last,
# ohlc.vwap,
# volume,
# )
# ohlc_last = ohlc
#
# elif typ == 'l1':
# quote = ohlc
# topic = quote['symbol']
#
# # XXX: format required by ``tractor.msg.pub``
# # requires a ``Dict[topic: str, quote: dict]``
# yield {topic: quote}
#
# except (ConnectionClosed, DisconnectionTimeout):
# log.exception("Good job kraken...reconnecting")
async def fill_bars(
first_bars,
shm,
symbol: str,
count: int = 75
) -> None:
async with get_client() as client:
next_dt = first_bars[0][1]
i = 0
while i < count:
try:
bars_array = await client.bars(
symbol=symbol,
since=arrow.get(next_dt).floor('minute')
.shift(minutes=-720).timestamp
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars_array[0][1]
await trio.sleep(5)
except BaseException as e:
log.exception(e)
await tractor.breakpoint()
_local_buffer_writers = {}
@asynccontextmanager
async def activate_writer(key: str) -> (bool, trio.Nursery):
try:
writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists:
_local_buffer_writers[key] = True
async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally:
_local_buffer_writers.pop(key, None)
@tractor.stream
@ -473,150 +355,152 @@ async def stream_quotes(
get_console_log(loglevel or tractor.current_actor().loglevel)
ws_pairs = {}
async with get_client() as client:
async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
async with get_client() as client:
# keep client cached for real-time section
for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
# keep client cached for real-time section
for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
writer_exists = get_shm_token(shm_token['shm_name'])
symbol = symbols[0]
symbol = symbols[0]
await tractor.breakpoint()
if not writer_already_exists:
shm = attach_shm_array(
token=shm_token,
# we are writer
readonly=False,
)
bars = await client.bars(symbol=symbol)
if not writer_exists:
shm = attach_shm_array(
token=shm_token,
# we are writer
readonly=False,
)
bars = await client.bars(symbol=symbol)
shm.push(bars)
shm_token = shm.token
shm.push(bars)
shm_token = shm.token
ln.start_soon(fill_bars, bars, shm, symbol)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
# pass back token, and bool, signalling if we're the writer
await ctx.send_yield((shm_token, not writer_exists))
# pass back token, and bool, signalling if we're the writer
await ctx.send_yield((shm_token, not writer_already_exists))
while True:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
) as ws:
while True:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
) as ws:
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_message(json.dumps(ohlc_sub))
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_message(json.dumps(ohlc_sub))
# trade data (aka L1)
l1_sub = make_sub(
list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10}
# trade data (aka L1)
l1_sub = make_sub(
list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10}
)
await ws.send_message(json.dumps(l1_sub))
)
await ws.send_message(json.dumps(l1_sub))
async def recv():
return json.loads(await ws.get_message())
async def recv():
return json.loads(await ws.get_message())
# pull a first quote and deliver
msg_gen = recv_msg(recv)
typ, ohlc_last = await msg_gen.__anext__()
# pull a first quote and deliver
msg_gen = recv_msg(recv)
typ, ohlc_last = await msg_gen.__anext__()
topic, quote = normalize(ohlc_last)
topic, quote = normalize(ohlc_last)
# packetize as {topic: quote}
await ctx.send_yield({topic: quote})
# keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime
# start streaming
async for typ, ohlc in msg_gen:
if typ == 'ohlc':
# TODO: can get rid of all this by using
# ``trades`` subscription...
# generate tick values to match time & sales pane:
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume
# new interval
if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
last = ohlc.close
if tick_volume:
ohlc.ticks.append({
'type': 'trade',
'price': last,
'size': tick_volume,
})
topic, quote = normalize(ohlc)
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_exists:
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick_volume
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
# write shm
shm.array[
['open',
'high',
'low',
'close',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
max(high, last),
min(low, last),
last,
ohlc.vwap,
volume,
)
ohlc_last = ohlc
elif typ == 'l1':
quote = ohlc
topic = quote['symbol']
# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
# packetize as {topic: quote}
await ctx.send_yield({topic: quote})
except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting")
# keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime
# start streaming
async for typ, ohlc in msg_gen:
if typ == 'ohlc':
# TODO: can get rid of all this by using
# ``trades`` subscription...
# generate tick values to match time & sales pane:
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume
# new interval
if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
last = ohlc.close
if tick_volume:
ohlc.ticks.append({
'type': 'trade',
'price': last,
'size': tick_volume,
})
topic, quote = normalize(ohlc)
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_already_exists:
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick_volume
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
# write shm
shm.array[
['open',
'high',
'low',
'close',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
max(high, last),
min(low, last),
last,
ohlc.vwap,
volume,
)
ohlc_last = ohlc
elif typ == 'l1':
quote = ohlc
topic = quote['symbol']
# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
await ctx.send_yield({topic: quote})
except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting")