Compare commits
	
		
			2 Commits 
		
	
	
		
			310_plus
			...
			kraken_his
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						e56d065dbc | |
| 
							
							
								 | 
						b1fd986a3a | 
| 
						 | 
					@ -17,9 +17,11 @@
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
Kraken backend.
 | 
					Kraken backend.
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import sys
 | 
				
			||||||
from contextlib import asynccontextmanager
 | 
					from contextlib import asynccontextmanager
 | 
				
			||||||
from dataclasses import dataclass, asdict, field
 | 
					from dataclasses import dataclass, asdict, field
 | 
				
			||||||
from typing import List, Dict, Any, Tuple, Optional
 | 
					from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
import time
 | 
					import time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -290,170 +292,215 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# @tractor.msg.pub
 | 
					async def fill_bars(
 | 
				
			||||||
async def stream_quotes(
 | 
					    first_bars,
 | 
				
			||||||
    # get_topics: Callable,
 | 
					    shm,
 | 
				
			||||||
    shm_token: Tuple[str, str, List[tuple]],
 | 
					    symbol: str,
 | 
				
			||||||
    symbols: List[str] = ['XBTUSD', 'XMRUSD'],
 | 
					    count: int = 75
 | 
				
			||||||
    # These are the symbols not expected by the ws api
 | 
					 | 
				
			||||||
    # they are looked up inside this routine.
 | 
					 | 
				
			||||||
    sub_type: str = 'ohlc',
 | 
					 | 
				
			||||||
    loglevel: str = None,
 | 
					 | 
				
			||||||
    # compat with eventual ``tractor.msg.pub``
 | 
					 | 
				
			||||||
    topics: Optional[List[str]] = None,
 | 
					 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
    """Subscribe for ohlc stream of quotes for ``pairs``.
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
					 | 
				
			||||||
    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ws_pairs = {}
 | 
					 | 
				
			||||||
    async with get_client() as client:
 | 
					    async with get_client() as client:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # keep client cached for real-time section
 | 
					        next_dt = first_bars[0][1]
 | 
				
			||||||
        for sym in symbols:
 | 
					        i = 0
 | 
				
			||||||
            ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
 | 
					        while i < count:
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
        # maybe load historical ohlcv in to shared mem
 | 
					 | 
				
			||||||
        # check if shm has already been created by previous
 | 
					 | 
				
			||||||
        # feed initialization
 | 
					 | 
				
			||||||
        writer_exists = get_shm_token(shm_token['shm_name'])
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        symbol = symbols[0]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if not writer_exists:
 | 
					 | 
				
			||||||
            shm = attach_shm_array(
 | 
					 | 
				
			||||||
                token=shm_token,
 | 
					 | 
				
			||||||
                # we are writer
 | 
					 | 
				
			||||||
                readonly=False,
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            bars = await client.bars(symbol=symbol)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            shm.push(bars)
 | 
					 | 
				
			||||||
            shm_token = shm.token
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            times = shm.array['time']
 | 
					 | 
				
			||||||
            delay_s = times[-1] - times[times != times[-1]][-1]
 | 
					 | 
				
			||||||
            subscribe_ohlc_for_increment(shm, delay_s)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        yield shm_token, not writer_exists
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        while True:
 | 
					 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                async with trio_websocket.open_websocket_url(
 | 
					                bars_array = await client.bars(
 | 
				
			||||||
                    'wss://ws.kraken.com/',
 | 
					                    symbol=symbol,
 | 
				
			||||||
                ) as ws:
 | 
					                    since=arrow.get(next_dt).floor('minute')
 | 
				
			||||||
 | 
					                        .shift(minutes=-720).timestamp
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                shm.push(bars_array, prepend=True)
 | 
				
			||||||
 | 
					                i += 1
 | 
				
			||||||
 | 
					                next_dt = bars_array[0][1]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # XXX: setup subs
 | 
					                await trio.sleep(5)
 | 
				
			||||||
                    # https://docs.kraken.com/websockets/#message-subscribe
 | 
					 | 
				
			||||||
                    # specific logic for this in kraken's shitty sync client:
 | 
					 | 
				
			||||||
                    # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
 | 
					 | 
				
			||||||
                    ohlc_sub = make_sub(
 | 
					 | 
				
			||||||
                        list(ws_pairs.values()),
 | 
					 | 
				
			||||||
                        {'name': 'ohlc', 'interval': 1}
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # TODO: we want to eventually allow unsubs which should
 | 
					            except BaseException as e:
 | 
				
			||||||
                    # be completely fine to request from a separate task
 | 
					                log.exception(e)
 | 
				
			||||||
                    # since internally the ws methods appear to be FIFO
 | 
					                await tractor.breakpoint()
 | 
				
			||||||
                    # locked.
 | 
					 | 
				
			||||||
                    await ws.send_message(json.dumps(ohlc_sub))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # trade data (aka L1)
 | 
					 | 
				
			||||||
                    l1_sub = make_sub(
 | 
					 | 
				
			||||||
                        list(ws_pairs.values()),
 | 
					 | 
				
			||||||
                        {'name': 'spread'}  # 'depth': 10}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    )
 | 
					_local_buffer_writers = {}
 | 
				
			||||||
                    await ws.send_message(json.dumps(l1_sub))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    async def recv():
 | 
					 | 
				
			||||||
                        return json.loads(await ws.get_message())
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # pull a first quote and deliver
 | 
					@asynccontextmanager
 | 
				
			||||||
                    msg_gen = recv_msg(recv)
 | 
					async def activate_writer(key: str) -> (bool, trio.Nursery):
 | 
				
			||||||
                    typ, ohlc_last = await msg_gen.__anext__()
 | 
					    try:
 | 
				
			||||||
 | 
					        writer_already_exists = _local_buffer_writers.get(key, False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    topic, quote = normalize(ohlc_last)
 | 
					        if not writer_already_exists:
 | 
				
			||||||
 | 
					            _local_buffer_writers[key] = True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # packetize as {topic: quote}
 | 
					            async with trio.open_nursery() as n:
 | 
				
			||||||
                    yield {topic: quote}
 | 
					                yield writer_already_exists, n
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            yield writer_already_exists, None
 | 
				
			||||||
 | 
					    finally:
 | 
				
			||||||
 | 
					        _local_buffer_writers.pop(key, None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # keep start of last interval for volume tracking
 | 
					 | 
				
			||||||
                    last_interval_start = ohlc_last.etime
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # start streaming
 | 
					@tractor.stream
 | 
				
			||||||
                    async for typ, ohlc in msg_gen:
 | 
					async def stream_quotes(
 | 
				
			||||||
 | 
					    ctx: tractor.Context,
 | 
				
			||||||
 | 
					    symbols: List[str],
 | 
				
			||||||
 | 
					    shm_token: Tuple[str, str, List[tuple]],
 | 
				
			||||||
 | 
					    loglevel: str = None,
 | 
				
			||||||
 | 
					    # compat for @tractor.msg.pub
 | 
				
			||||||
 | 
					    topics: Any = None
 | 
				
			||||||
 | 
					) -> AsyncIterator[Dict[str, Any]]:
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
				
			||||||
 | 
					    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    ws_pairs = {}
 | 
				
			||||||
 | 
					    async with activate_writer(
 | 
				
			||||||
 | 
					        shm_token['shm_name']
 | 
				
			||||||
 | 
					    ) as (writer_already_exists, ln):        
 | 
				
			||||||
 | 
					        async with get_client() as client:
 | 
				
			||||||
 | 
					 
 | 
				
			||||||
 | 
					            # keep client cached for real-time section
 | 
				
			||||||
 | 
					            for sym in symbols:
 | 
				
			||||||
 | 
					                ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        if typ == 'ohlc':
 | 
					            symbol = symbols[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # TODO: can get rid of all this by using
 | 
					            if not writer_already_exists:
 | 
				
			||||||
                            # ``trades`` subscription...
 | 
					                shm = attach_shm_array(
 | 
				
			||||||
 | 
					                    token=shm_token,
 | 
				
			||||||
 | 
					                    # we are writer
 | 
				
			||||||
 | 
					                    readonly=False,
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                bars = await client.bars(symbol=symbol)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # generate tick values to match time & sales pane:
 | 
					                shm.push(bars)
 | 
				
			||||||
                            # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
 | 
					                shm_token = shm.token
 | 
				
			||||||
                            volume = ohlc.volume
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # new interval
 | 
					                ln.start_soon(fill_bars, bars, shm, symbol)            
 | 
				
			||||||
                            if ohlc.etime > last_interval_start:
 | 
					 | 
				
			||||||
                                last_interval_start = ohlc.etime
 | 
					 | 
				
			||||||
                                tick_volume = volume
 | 
					 | 
				
			||||||
                            else:
 | 
					 | 
				
			||||||
                                # this is the tick volume *within the interval*
 | 
					 | 
				
			||||||
                                tick_volume = volume - ohlc_last.volume
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            last = ohlc.close
 | 
					                times = shm.array['time']
 | 
				
			||||||
                            if tick_volume:
 | 
					                delay_s = times[-1] - times[times != times[-1]][-1]
 | 
				
			||||||
                                ohlc.ticks.append({
 | 
					                subscribe_ohlc_for_increment(shm, delay_s)
 | 
				
			||||||
                                    'type': 'trade',
 | 
					   
 | 
				
			||||||
                                    'price': last,
 | 
					            # pass back token, and bool, signalling if we're the writer
 | 
				
			||||||
                                    'size': tick_volume,
 | 
					            await ctx.send_yield((shm_token, not writer_already_exists))
 | 
				
			||||||
                                })
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            topic, quote = normalize(ohlc)
 | 
					            while True:
 | 
				
			||||||
 | 
					                try:
 | 
				
			||||||
 | 
					                    async with trio_websocket.open_websocket_url(
 | 
				
			||||||
 | 
					                        'wss://ws.kraken.com',
 | 
				
			||||||
 | 
					                    ) as ws:
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                        # XXX: setup subs
 | 
				
			||||||
 | 
					                        # https://docs.kraken.com/websockets/#message-subscribe
 | 
				
			||||||
 | 
					                        # specific logic for this in kraken's shitty sync client:
 | 
				
			||||||
 | 
					                        # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
 | 
				
			||||||
 | 
					                        ohlc_sub = make_sub(
 | 
				
			||||||
 | 
					                            list(ws_pairs.values()),
 | 
				
			||||||
 | 
					                            {'name': 'ohlc', 'interval': 1}
 | 
				
			||||||
 | 
					                        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # if we are the lone tick writer start writing
 | 
					                        # TODO: we want to eventually allow unsubs which should
 | 
				
			||||||
                            # the buffer with appropriate trade data
 | 
					                        # be completely fine to request from a separate task
 | 
				
			||||||
                            if not writer_exists:
 | 
					                        # since internally the ws methods appear to be FIFO
 | 
				
			||||||
                                # update last entry
 | 
					                        # locked.
 | 
				
			||||||
                                # benchmarked in the 4-5 us range
 | 
					                        await ws.send_message(json.dumps(ohlc_sub))
 | 
				
			||||||
                                o, high, low, v = shm.array[-1][
 | 
					 | 
				
			||||||
                                    ['open', 'high', 'low', 'volume']
 | 
					 | 
				
			||||||
                                ]
 | 
					 | 
				
			||||||
                                new_v = tick_volume
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                if v == 0 and new_v:
 | 
					                        # trade data (aka L1)
 | 
				
			||||||
                                    # no trades for this bar yet so the open
 | 
					                        l1_sub = make_sub(
 | 
				
			||||||
                                    # is also the close/last trade price
 | 
					                            list(ws_pairs.values()),
 | 
				
			||||||
                                    o = last
 | 
					                            {'name': 'spread'}  # 'depth': 10}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                # write shm
 | 
					                        )
 | 
				
			||||||
                                shm.array[
 | 
					                        await ws.send_message(json.dumps(l1_sub))
 | 
				
			||||||
                                    ['open',
 | 
					 | 
				
			||||||
                                     'high',
 | 
					 | 
				
			||||||
                                     'low',
 | 
					 | 
				
			||||||
                                     'close',
 | 
					 | 
				
			||||||
                                     'bar_wap',  # in this case vwap of bar
 | 
					 | 
				
			||||||
                                     'volume']
 | 
					 | 
				
			||||||
                                ][-1] = (
 | 
					 | 
				
			||||||
                                    o,
 | 
					 | 
				
			||||||
                                    max(high, last),
 | 
					 | 
				
			||||||
                                    min(low, last),
 | 
					 | 
				
			||||||
                                    last,
 | 
					 | 
				
			||||||
                                    ohlc.vwap,
 | 
					 | 
				
			||||||
                                    volume,
 | 
					 | 
				
			||||||
                                )
 | 
					 | 
				
			||||||
                            ohlc_last = ohlc
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        elif typ == 'l1':
 | 
					                        async def recv():
 | 
				
			||||||
                            quote = ohlc
 | 
					                            return json.loads(await ws.get_message())
 | 
				
			||||||
                            topic = quote['symbol']
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        # XXX: format required by ``tractor.msg.pub``
 | 
					                        # pull a first quote and deliver
 | 
				
			||||||
                        # requires a ``Dict[topic: str, quote: dict]``
 | 
					                        msg_gen = recv_msg(recv)
 | 
				
			||||||
                        yield {topic: quote}
 | 
					                        typ, ohlc_last = await msg_gen.__anext__()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        topic, quote = normalize(ohlc_last)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        # packetize as {topic: quote}
 | 
				
			||||||
 | 
					                        await ctx.send_yield({topic: quote})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        # keep start of last interval for volume tracking
 | 
				
			||||||
 | 
					                        last_interval_start = ohlc_last.etime
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        # start streaming
 | 
				
			||||||
 | 
					                        async for typ, ohlc in msg_gen:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            if typ == 'ohlc':
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                # TODO: can get rid of all this by using
 | 
				
			||||||
 | 
					                                # ``trades`` subscription...
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                # generate tick values to match time & sales pane:
 | 
				
			||||||
 | 
					                                # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
 | 
				
			||||||
 | 
					                                volume = ohlc.volume
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                # new interval
 | 
				
			||||||
 | 
					                                if ohlc.etime > last_interval_start:
 | 
				
			||||||
 | 
					                                    last_interval_start = ohlc.etime
 | 
				
			||||||
 | 
					                                    tick_volume = volume
 | 
				
			||||||
 | 
					                                else:
 | 
				
			||||||
 | 
					                                    # this is the tick volume *within the interval*
 | 
				
			||||||
 | 
					                                    tick_volume = volume - ohlc_last.volume
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                last = ohlc.close
 | 
				
			||||||
 | 
					                                if tick_volume:
 | 
				
			||||||
 | 
					                                    ohlc.ticks.append({
 | 
				
			||||||
 | 
					                                        'type': 'trade',
 | 
				
			||||||
 | 
					                                        'price': last,
 | 
				
			||||||
 | 
					                                        'size': tick_volume,
 | 
				
			||||||
 | 
					                                    })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                topic, quote = normalize(ohlc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                # if we are the lone tick writer start writing
 | 
				
			||||||
 | 
					                                # the buffer with appropriate trade data
 | 
				
			||||||
 | 
					                                if not writer_already_exists:
 | 
				
			||||||
 | 
					                                    # update last entry
 | 
				
			||||||
 | 
					                                    # benchmarked in the 4-5 us range
 | 
				
			||||||
 | 
					                                    o, high, low, v = shm.array[-1][
 | 
				
			||||||
 | 
					                                        ['open', 'high', 'low', 'volume']
 | 
				
			||||||
 | 
					                                    ]
 | 
				
			||||||
 | 
					                                    new_v = tick_volume
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                    if v == 0 and new_v:
 | 
				
			||||||
 | 
					                                        # no trades for this bar yet so the open
 | 
				
			||||||
 | 
					                                        # is also the close/last trade price
 | 
				
			||||||
 | 
					                                        o = last
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                    # write shm
 | 
				
			||||||
 | 
					                                    shm.array[
 | 
				
			||||||
 | 
					                                        ['open',
 | 
				
			||||||
 | 
					                                         'high',
 | 
				
			||||||
 | 
					                                         'low',
 | 
				
			||||||
 | 
					                                         'close',
 | 
				
			||||||
 | 
					                                         'bar_wap',  # in this case vwap of bar
 | 
				
			||||||
 | 
					                                         'volume']
 | 
				
			||||||
 | 
					                                    ][-1] = (
 | 
				
			||||||
 | 
					                                        o,
 | 
				
			||||||
 | 
					                                        max(high, last),
 | 
				
			||||||
 | 
					                                        min(low, last),
 | 
				
			||||||
 | 
					                                        last,
 | 
				
			||||||
 | 
					                                        ohlc.vwap,
 | 
				
			||||||
 | 
					                                        volume,
 | 
				
			||||||
 | 
					                                    )
 | 
				
			||||||
 | 
					                                ohlc_last = ohlc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            elif typ == 'l1':
 | 
				
			||||||
 | 
					                                quote = ohlc
 | 
				
			||||||
 | 
					                                topic = quote['symbol']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            # XXX: format required by ``tractor.msg.pub``
 | 
				
			||||||
 | 
					                            # requires a ``Dict[topic: str, quote: dict]``
 | 
				
			||||||
 | 
					                            await ctx.send_yield({topic: quote})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                except (ConnectionClosed, DisconnectionTimeout):
 | 
				
			||||||
 | 
					                    log.exception("Good job kraken...reconnecting")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            except (ConnectionClosed, DisconnectionTimeout):
 | 
					 | 
				
			||||||
                log.exception("Good job kraken...reconnecting")
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue