From d976f3d074d684f8821cdd9ce3d61a33628faa6a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Aug 2020 16:52:51 -0400 Subject: [PATCH] Generate tick data correctly using .etime --- piker/brokers/kraken.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 0ec14b40..a0b3767c 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -14,7 +14,7 @@ import numpy as np import tractor from ._util import resproc, SymbolNotFound, BrokerError -from ..log import get_logger +from ..log import get_logger, get_console_log log = get_logger(__name__) @@ -119,11 +119,15 @@ async def stream_quotes( # they are looked up inside this routine. symbols: List[str] = ['XBTUSD', 'XMRUSD'], sub_type: str = 'ohlc', + loglevel: str = None, ) -> None: """Subscribe for ohlc stream of quotes for ``pairs``. ``pairs`` must be formatted /. """ + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + ws_pairs = {} async with get_client() as client: for sym in symbols: @@ -175,7 +179,7 @@ async def stream_quotes( low: float # Low price within interval close: float # Close price of interval vwap: float # Volume weighted average price within interval - volume: float # Accumulated volume within interval + volume: float # Accumulated volume **within interval** count: int # Number of trades within interval # (sampled) generated tick data ticks: List[Any] = field(default_factory=list) @@ -191,16 +195,28 @@ async def stream_quotes( ohlc_last = await ohlc_gen.__anext__() yield asdict(ohlc_last) + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime + async for ohlc in ohlc_gen: - # debug - print(ohlc) + + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m volume = ohlc.volume - vol_diff = volume - ohlc_last.volume - if vol_diff: + if ohlc.etime > last_interval_start: # new interval + log.debug( + f"New interval last: {ohlc_last.time}, now: {ohlc.time}") + last_interval_start = ohlc.etime + tick_volume = volume + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume + + if tick_volume: ohlc.ticks.append({ 'type': 'trade', 'price': ohlc.close, - 'size': vol_diff, + 'size': tick_volume, }) yield asdict(ohlc) ohlc_last = ohlc