From d9708e28c8d50f16ec47e530e3a7e3c48043f092 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 20 Jun 2023 14:33:32 -0400 Subject: [PATCH] kraken: drop `OHLC.ticks` field and just inject to quote before send --- piker/brokers/kraken/feed.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 4830914f..26956a1c 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -65,7 +65,7 @@ from .api import ( ) -class OHLC(Struct): +class OHLC(Struct, frozen=True): ''' Description of the flattened OHLC quote format. @@ -76,6 +76,8 @@ class OHLC(Struct): chan_id: int # internal kraken id chan_name: str # eg. ohlc-1 (name-interval) pair: str # fx pair + + # unpacked from array time: float # Begin time of interval, in seconds since epoch etime: float # End time of interval, in seconds since epoch open: float # Open price of interval @@ -85,8 +87,6 @@ class OHLC(Struct): vwap: float # Volume weighted average price within interval volume: float # Accumulated volume **within interval** count: int # Number of trades within interval - # (sampled) generated tick data - ticks: list[Any] = [] async def stream_messages( @@ -150,14 +150,15 @@ async def process_data_feed_msgs( pair ]: if 'ohlc' in chan_name: + array: list = payload_array[0] ohlc = OHLC( chan_id, chan_name, pair, - *payload_array[0] + *map(float, array[:-1]), + count=array[-1], ) - ohlc.typecast() - yield 'ohlc', ohlc + yield 'ohlc', ohlc.copy() elif 'spread' in chan_name: @@ -430,7 +431,7 @@ async def stream_quotes( feed_is_live.set() # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime + last_interval_start: float = ohlc_last.etime # start streaming topic: str = mkt.bs_fqme @@ -448,24 +449,23 @@ async def stream_quotes( # new OHLC sample interval if quote.etime > last_interval_start: - last_interval_start = quote.etime - tick_volume = volume + last_interval_start: float = quote.etime + tick_volume: float = volume else: # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume + tick_volume: float = volume - ohlc_last.volume ohlc_last = quote last = quote.close + quote = normalize(quote) if tick_volume: - quote.ticks.append({ + quote['ticks'] = [{ 'type': 'trade', 'price': last, 'size': tick_volume, - }) - - quote = normalize(quote) + }] case 'l1': # passthrough quote msg