Compare commits
	
		
			2 Commits 
		
	
	
		
			310_plus
			...
			kraken_his
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						e56d065dbc | |
| 
							
							
								 | 
						b1fd986a3a | 
| 
						 | 
				
			
			@ -17,9 +17,11 @@
 | 
			
		|||
"""
 | 
			
		||||
Kraken backend.
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
import sys
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from dataclasses import dataclass, asdict, field
 | 
			
		||||
from typing import List, Dict, Any, Tuple, Optional
 | 
			
		||||
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
 | 
			
		||||
import json
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -290,170 +292,215 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# @tractor.msg.pub
 | 
			
		||||
async def stream_quotes(
 | 
			
		||||
    # get_topics: Callable,
 | 
			
		||||
    shm_token: Tuple[str, str, List[tuple]],
 | 
			
		||||
    symbols: List[str] = ['XBTUSD', 'XMRUSD'],
 | 
			
		||||
    # These are the symbols not expected by the ws api
 | 
			
		||||
    # they are looked up inside this routine.
 | 
			
		||||
    sub_type: str = 'ohlc',
 | 
			
		||||
    loglevel: str = None,
 | 
			
		||||
    # compat with eventual ``tractor.msg.pub``
 | 
			
		||||
    topics: Optional[List[str]] = None,
 | 
			
		||||
async def fill_bars(
 | 
			
		||||
    first_bars,
 | 
			
		||||
    shm,
 | 
			
		||||
    symbol: str,
 | 
			
		||||
    count: int = 75
 | 
			
		||||
) -> None:
 | 
			
		||||
    """Subscribe for ohlc stream of quotes for ``pairs``.
 | 
			
		||||
 | 
			
		||||
    ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
 | 
			
		||||
    """
 | 
			
		||||
    async with get_client() as client:
 | 
			
		||||
 | 
			
		||||
        next_dt = first_bars[0][1]
 | 
			
		||||
        i = 0
 | 
			
		||||
        while i < count:
 | 
			
		||||
            
 | 
			
		||||
            try:
 | 
			
		||||
                bars_array = await client.bars(
 | 
			
		||||
                    symbol=symbol,
 | 
			
		||||
                    since=arrow.get(next_dt).floor('minute')
 | 
			
		||||
                        .shift(minutes=-720).timestamp
 | 
			
		||||
                )
 | 
			
		||||
                shm.push(bars_array, prepend=True)
 | 
			
		||||
                i += 1
 | 
			
		||||
                next_dt = bars_array[0][1]
 | 
			
		||||
 | 
			
		||||
                await trio.sleep(5)
 | 
			
		||||
 | 
			
		||||
            except BaseException as e:
 | 
			
		||||
                log.exception(e)
 | 
			
		||||
                await tractor.breakpoint()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_local_buffer_writers = {}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def activate_writer(key: str) -> (bool, trio.Nursery):
 | 
			
		||||
    try:
 | 
			
		||||
        writer_already_exists = _local_buffer_writers.get(key, False)
 | 
			
		||||
 | 
			
		||||
        if not writer_already_exists:
 | 
			
		||||
            _local_buffer_writers[key] = True
 | 
			
		||||
 | 
			
		||||
            async with trio.open_nursery() as n:
 | 
			
		||||
                yield writer_already_exists, n
 | 
			
		||||
        else:
 | 
			
		||||
            yield writer_already_exists, None
 | 
			
		||||
    finally:
 | 
			
		||||
        _local_buffer_writers.pop(key, None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.stream
 | 
			
		||||
async def stream_quotes(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    symbols: List[str],
 | 
			
		||||
    shm_token: Tuple[str, str, List[tuple]],
 | 
			
		||||
    loglevel: str = None,
 | 
			
		||||
    # compat for @tractor.msg.pub
 | 
			
		||||
    topics: Any = None
 | 
			
		||||
) -> AsyncIterator[Dict[str, Any]]:
 | 
			
		||||
    
 | 
			
		||||
    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
			
		||||
    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
			
		||||
    
 | 
			
		||||
    ws_pairs = {}
 | 
			
		||||
    async with get_client() as client:
 | 
			
		||||
    async with activate_writer(
 | 
			
		||||
        shm_token['shm_name']
 | 
			
		||||
    ) as (writer_already_exists, ln):        
 | 
			
		||||
        async with get_client() as client:
 | 
			
		||||
 
 | 
			
		||||
        # keep client cached for real-time section
 | 
			
		||||
        for sym in symbols:
 | 
			
		||||
            ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
 | 
			
		||||
            # keep client cached for real-time section
 | 
			
		||||
            for sym in symbols:
 | 
			
		||||
                ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
 | 
			
		||||
 | 
			
		||||
        # maybe load historical ohlcv in to shared mem
 | 
			
		||||
        # check if shm has already been created by previous
 | 
			
		||||
        # feed initialization
 | 
			
		||||
        writer_exists = get_shm_token(shm_token['shm_name'])
 | 
			
		||||
            symbol = symbols[0]
 | 
			
		||||
 | 
			
		||||
        symbol = symbols[0]
 | 
			
		||||
            if not writer_already_exists:
 | 
			
		||||
                shm = attach_shm_array(
 | 
			
		||||
                    token=shm_token,
 | 
			
		||||
                    # we are writer
 | 
			
		||||
                    readonly=False,
 | 
			
		||||
                )
 | 
			
		||||
                bars = await client.bars(symbol=symbol)
 | 
			
		||||
 | 
			
		||||
        if not writer_exists:
 | 
			
		||||
            shm = attach_shm_array(
 | 
			
		||||
                token=shm_token,
 | 
			
		||||
                # we are writer
 | 
			
		||||
                readonly=False,
 | 
			
		||||
            )
 | 
			
		||||
            bars = await client.bars(symbol=symbol)
 | 
			
		||||
                shm.push(bars)
 | 
			
		||||
                shm_token = shm.token
 | 
			
		||||
 | 
			
		||||
            shm.push(bars)
 | 
			
		||||
            shm_token = shm.token
 | 
			
		||||
                ln.start_soon(fill_bars, bars, shm, symbol)            
 | 
			
		||||
 | 
			
		||||
            times = shm.array['time']
 | 
			
		||||
            delay_s = times[-1] - times[times != times[-1]][-1]
 | 
			
		||||
            subscribe_ohlc_for_increment(shm, delay_s)
 | 
			
		||||
                times = shm.array['time']
 | 
			
		||||
                delay_s = times[-1] - times[times != times[-1]][-1]
 | 
			
		||||
                subscribe_ohlc_for_increment(shm, delay_s)
 | 
			
		||||
   
 | 
			
		||||
        yield shm_token, not writer_exists
 | 
			
		||||
            # pass back token, and bool, signalling if we're the writer
 | 
			
		||||
            await ctx.send_yield((shm_token, not writer_already_exists))
 | 
			
		||||
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                async with trio_websocket.open_websocket_url(
 | 
			
		||||
                    'wss://ws.kraken.com/',
 | 
			
		||||
                ) as ws:
 | 
			
		||||
            while True:
 | 
			
		||||
                try:
 | 
			
		||||
                    async with trio_websocket.open_websocket_url(
 | 
			
		||||
                        'wss://ws.kraken.com',
 | 
			
		||||
                    ) as ws:
 | 
			
		||||
                    
 | 
			
		||||
                    # XXX: setup subs
 | 
			
		||||
                    # https://docs.kraken.com/websockets/#message-subscribe
 | 
			
		||||
                    # specific logic for this in kraken's shitty sync client:
 | 
			
		||||
                    # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
 | 
			
		||||
                    ohlc_sub = make_sub(
 | 
			
		||||
                        list(ws_pairs.values()),
 | 
			
		||||
                        {'name': 'ohlc', 'interval': 1}
 | 
			
		||||
                    )
 | 
			
		||||
                        # XXX: setup subs
 | 
			
		||||
                        # https://docs.kraken.com/websockets/#message-subscribe
 | 
			
		||||
                        # specific logic for this in kraken's shitty sync client:
 | 
			
		||||
                        # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
 | 
			
		||||
                        ohlc_sub = make_sub(
 | 
			
		||||
                            list(ws_pairs.values()),
 | 
			
		||||
                            {'name': 'ohlc', 'interval': 1}
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                    # TODO: we want to eventually allow unsubs which should
 | 
			
		||||
                    # be completely fine to request from a separate task
 | 
			
		||||
                    # since internally the ws methods appear to be FIFO
 | 
			
		||||
                    # locked.
 | 
			
		||||
                    await ws.send_message(json.dumps(ohlc_sub))
 | 
			
		||||
                        # TODO: we want to eventually allow unsubs which should
 | 
			
		||||
                        # be completely fine to request from a separate task
 | 
			
		||||
                        # since internally the ws methods appear to be FIFO
 | 
			
		||||
                        # locked.
 | 
			
		||||
                        await ws.send_message(json.dumps(ohlc_sub))
 | 
			
		||||
 | 
			
		||||
                    # trade data (aka L1)
 | 
			
		||||
                    l1_sub = make_sub(
 | 
			
		||||
                        list(ws_pairs.values()),
 | 
			
		||||
                        {'name': 'spread'}  # 'depth': 10}
 | 
			
		||||
                        # trade data (aka L1)
 | 
			
		||||
                        l1_sub = make_sub(
 | 
			
		||||
                            list(ws_pairs.values()),
 | 
			
		||||
                            {'name': 'spread'}  # 'depth': 10}
 | 
			
		||||
 | 
			
		||||
                    )
 | 
			
		||||
                    await ws.send_message(json.dumps(l1_sub))
 | 
			
		||||
                        )
 | 
			
		||||
                        await ws.send_message(json.dumps(l1_sub))
 | 
			
		||||
 | 
			
		||||
                    async def recv():
 | 
			
		||||
                        return json.loads(await ws.get_message())
 | 
			
		||||
                        async def recv():
 | 
			
		||||
                            return json.loads(await ws.get_message())
 | 
			
		||||
 | 
			
		||||
                    # pull a first quote and deliver
 | 
			
		||||
                    msg_gen = recv_msg(recv)
 | 
			
		||||
                    typ, ohlc_last = await msg_gen.__anext__()
 | 
			
		||||
                        # pull a first quote and deliver
 | 
			
		||||
                        msg_gen = recv_msg(recv)
 | 
			
		||||
                        typ, ohlc_last = await msg_gen.__anext__()
 | 
			
		||||
 | 
			
		||||
                    topic, quote = normalize(ohlc_last)
 | 
			
		||||
                        topic, quote = normalize(ohlc_last)
 | 
			
		||||
 | 
			
		||||
                    # packetize as {topic: quote}
 | 
			
		||||
                    yield {topic: quote}
 | 
			
		||||
                        # packetize as {topic: quote}
 | 
			
		||||
                        await ctx.send_yield({topic: quote})
 | 
			
		||||
 | 
			
		||||
                    # keep start of last interval for volume tracking
 | 
			
		||||
                    last_interval_start = ohlc_last.etime
 | 
			
		||||
                        # keep start of last interval for volume tracking
 | 
			
		||||
                        last_interval_start = ohlc_last.etime
 | 
			
		||||
 | 
			
		||||
                    # start streaming
 | 
			
		||||
                    async for typ, ohlc in msg_gen:
 | 
			
		||||
                        # start streaming
 | 
			
		||||
                        async for typ, ohlc in msg_gen:
 | 
			
		||||
 | 
			
		||||
                        if typ == 'ohlc':
 | 
			
		||||
                            if typ == 'ohlc':
 | 
			
		||||
 | 
			
		||||
                            # TODO: can get rid of all this by using
 | 
			
		||||
                            # ``trades`` subscription...
 | 
			
		||||
                                # TODO: can get rid of all this by using
 | 
			
		||||
                                # ``trades`` subscription...
 | 
			
		||||
 | 
			
		||||
                            # generate tick values to match time & sales pane:
 | 
			
		||||
                            # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
 | 
			
		||||
                            volume = ohlc.volume
 | 
			
		||||
                                # generate tick values to match time & sales pane:
 | 
			
		||||
                                # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
 | 
			
		||||
                                volume = ohlc.volume
 | 
			
		||||
 | 
			
		||||
                            # new interval
 | 
			
		||||
                            if ohlc.etime > last_interval_start:
 | 
			
		||||
                                last_interval_start = ohlc.etime
 | 
			
		||||
                                tick_volume = volume
 | 
			
		||||
                            else:
 | 
			
		||||
                                # this is the tick volume *within the interval*
 | 
			
		||||
                                tick_volume = volume - ohlc_last.volume
 | 
			
		||||
                                # new interval
 | 
			
		||||
                                if ohlc.etime > last_interval_start:
 | 
			
		||||
                                    last_interval_start = ohlc.etime
 | 
			
		||||
                                    tick_volume = volume
 | 
			
		||||
                                else:
 | 
			
		||||
                                    # this is the tick volume *within the interval*
 | 
			
		||||
                                    tick_volume = volume - ohlc_last.volume
 | 
			
		||||
 | 
			
		||||
                            last = ohlc.close
 | 
			
		||||
                            if tick_volume:
 | 
			
		||||
                                ohlc.ticks.append({
 | 
			
		||||
                                    'type': 'trade',
 | 
			
		||||
                                    'price': last,
 | 
			
		||||
                                    'size': tick_volume,
 | 
			
		||||
                                })
 | 
			
		||||
                                last = ohlc.close
 | 
			
		||||
                                if tick_volume:
 | 
			
		||||
                                    ohlc.ticks.append({
 | 
			
		||||
                                        'type': 'trade',
 | 
			
		||||
                                        'price': last,
 | 
			
		||||
                                        'size': tick_volume,
 | 
			
		||||
                                    })
 | 
			
		||||
 | 
			
		||||
                            topic, quote = normalize(ohlc)
 | 
			
		||||
                                topic, quote = normalize(ohlc)
 | 
			
		||||
 | 
			
		||||
                            # if we are the lone tick writer start writing
 | 
			
		||||
                            # the buffer with appropriate trade data
 | 
			
		||||
                            if not writer_exists:
 | 
			
		||||
                                # update last entry
 | 
			
		||||
                                # benchmarked in the 4-5 us range
 | 
			
		||||
                                o, high, low, v = shm.array[-1][
 | 
			
		||||
                                    ['open', 'high', 'low', 'volume']
 | 
			
		||||
                                ]
 | 
			
		||||
                                new_v = tick_volume
 | 
			
		||||
                                # if we are the lone tick writer start writing
 | 
			
		||||
                                # the buffer with appropriate trade data
 | 
			
		||||
                                if not writer_already_exists:
 | 
			
		||||
                                    # update last entry
 | 
			
		||||
                                    # benchmarked in the 4-5 us range
 | 
			
		||||
                                    o, high, low, v = shm.array[-1][
 | 
			
		||||
                                        ['open', 'high', 'low', 'volume']
 | 
			
		||||
                                    ]
 | 
			
		||||
                                    new_v = tick_volume
 | 
			
		||||
 | 
			
		||||
                                if v == 0 and new_v:
 | 
			
		||||
                                    # no trades for this bar yet so the open
 | 
			
		||||
                                    # is also the close/last trade price
 | 
			
		||||
                                    o = last
 | 
			
		||||
                                    if v == 0 and new_v:
 | 
			
		||||
                                        # no trades for this bar yet so the open
 | 
			
		||||
                                        # is also the close/last trade price
 | 
			
		||||
                                        o = last
 | 
			
		||||
 | 
			
		||||
                                # write shm
 | 
			
		||||
                                shm.array[
 | 
			
		||||
                                    ['open',
 | 
			
		||||
                                     'high',
 | 
			
		||||
                                     'low',
 | 
			
		||||
                                     'close',
 | 
			
		||||
                                     'bar_wap',  # in this case vwap of bar
 | 
			
		||||
                                     'volume']
 | 
			
		||||
                                ][-1] = (
 | 
			
		||||
                                    o,
 | 
			
		||||
                                    max(high, last),
 | 
			
		||||
                                    min(low, last),
 | 
			
		||||
                                    last,
 | 
			
		||||
                                    ohlc.vwap,
 | 
			
		||||
                                    volume,
 | 
			
		||||
                                )
 | 
			
		||||
                            ohlc_last = ohlc
 | 
			
		||||
                                    # write shm
 | 
			
		||||
                                    shm.array[
 | 
			
		||||
                                        ['open',
 | 
			
		||||
                                         'high',
 | 
			
		||||
                                         'low',
 | 
			
		||||
                                         'close',
 | 
			
		||||
                                         'bar_wap',  # in this case vwap of bar
 | 
			
		||||
                                         'volume']
 | 
			
		||||
                                    ][-1] = (
 | 
			
		||||
                                        o,
 | 
			
		||||
                                        max(high, last),
 | 
			
		||||
                                        min(low, last),
 | 
			
		||||
                                        last,
 | 
			
		||||
                                        ohlc.vwap,
 | 
			
		||||
                                        volume,
 | 
			
		||||
                                    )
 | 
			
		||||
                                ohlc_last = ohlc
 | 
			
		||||
 | 
			
		||||
                        elif typ == 'l1':
 | 
			
		||||
                            quote = ohlc
 | 
			
		||||
                            topic = quote['symbol']
 | 
			
		||||
                            elif typ == 'l1':
 | 
			
		||||
                                quote = ohlc
 | 
			
		||||
                                topic = quote['symbol']
 | 
			
		||||
 | 
			
		||||
                        # XXX: format required by ``tractor.msg.pub``
 | 
			
		||||
                        # requires a ``Dict[topic: str, quote: dict]``
 | 
			
		||||
                        yield {topic: quote}
 | 
			
		||||
                            # XXX: format required by ``tractor.msg.pub``
 | 
			
		||||
                            # requires a ``Dict[topic: str, quote: dict]``
 | 
			
		||||
                            await ctx.send_yield({topic: quote})
 | 
			
		||||
 | 
			
		||||
                except (ConnectionClosed, DisconnectionTimeout):
 | 
			
		||||
                    log.exception("Good job kraken...reconnecting")
 | 
			
		||||
 | 
			
		||||
            except (ConnectionClosed, DisconnectionTimeout):
 | 
			
		||||
                log.exception("Good job kraken...reconnecting")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue