Compare commits

...

2 Commits

Author SHA1 Message Date
Guillermo Rodriguez e56d065dbc
Add fill_bars function to kraken 2021-01-04 08:30:05 -03:00
Guillermo Rodriguez b1fd986a3a
Change kraken.stream_quotes to use tractor.stream api 2021-01-02 12:35:38 -03:00
1 changed files with 188 additions and 141 deletions

View File

@ -17,9 +17,11 @@
""" """
Kraken backend. Kraken backend.
""" """
import sys
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict, field from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import json import json
import time import time
@ -290,40 +292,81 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
} }
# @tractor.msg.pub async def fill_bars(
async def stream_quotes( first_bars,
# get_topics: Callable, shm,
shm_token: Tuple[str, str, List[tuple]], symbol: str,
symbols: List[str] = ['XBTUSD', 'XMRUSD'], count: int = 75
# These are the symbols not expected by the ws api
# they are looked up inside this routine.
sub_type: str = 'ohlc',
loglevel: str = None,
# compat with eventual ``tractor.msg.pub``
topics: Optional[List[str]] = None,
) -> None: ) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``.
``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>. async with get_client() as client:
"""
next_dt = first_bars[0][1]
i = 0
while i < count:
try:
bars_array = await client.bars(
symbol=symbol,
since=arrow.get(next_dt).floor('minute')
.shift(minutes=-720).timestamp
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars_array[0][1]
await trio.sleep(5)
except BaseException as e:
log.exception(e)
await tractor.breakpoint()
_local_buffer_writers = {}
@asynccontextmanager
async def activate_writer(key: str) -> (bool, trio.Nursery):
try:
writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists:
_local_buffer_writers[key] = True
async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally:
_local_buffer_writers.pop(key, None)
@tractor.stream
async def stream_quotes(
ctx: tractor.Context,
symbols: List[str],
shm_token: Tuple[str, str, List[tuple]],
loglevel: str = None,
# compat for @tractor.msg.pub
topics: Any = None
) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
ws_pairs = {} ws_pairs = {}
async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
async with get_client() as client: async with get_client() as client:
# keep client cached for real-time section # keep client cached for real-time section
for sym in symbols: for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
writer_exists = get_shm_token(shm_token['shm_name'])
symbol = symbols[0] symbol = symbols[0]
if not writer_exists: if not writer_already_exists:
shm = attach_shm_array( shm = attach_shm_array(
token=shm_token, token=shm_token,
# we are writer # we are writer
@ -334,16 +377,19 @@ async def stream_quotes(
shm.push(bars) shm.push(bars)
shm_token = shm.token shm_token = shm.token
ln.start_soon(fill_bars, bars, shm, symbol)
times = shm.array['time'] times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s) subscribe_ohlc_for_increment(shm, delay_s)
yield shm_token, not writer_exists # pass back token, and bool, signalling if we're the writer
await ctx.send_yield((shm_token, not writer_already_exists))
while True: while True:
try: try:
async with trio_websocket.open_websocket_url( async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com/', 'wss://ws.kraken.com',
) as ws: ) as ws:
# XXX: setup subs # XXX: setup subs
@ -379,7 +425,7 @@ async def stream_quotes(
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)
# packetize as {topic: quote} # packetize as {topic: quote}
yield {topic: quote} await ctx.send_yield({topic: quote})
# keep start of last interval for volume tracking # keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime last_interval_start = ohlc_last.etime
@ -416,7 +462,7 @@ async def stream_quotes(
# if we are the lone tick writer start writing # if we are the lone tick writer start writing
# the buffer with appropriate trade data # the buffer with appropriate trade data
if not writer_exists: if not writer_already_exists:
# update last entry # update last entry
# benchmarked in the 4-5 us range # benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][ o, high, low, v = shm.array[-1][
@ -453,7 +499,8 @@ async def stream_quotes(
# XXX: format required by ``tractor.msg.pub`` # XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]`` # requires a ``Dict[topic: str, quote: dict]``
yield {topic: quote} await ctx.send_yield({topic: quote})
except (ConnectionClosed, DisconnectionTimeout): except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting") log.exception("Good job kraken...reconnecting")