diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index add23b18..68c7238e 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -33,7 +33,6 @@ import asks from fuzzywuzzy import process as fuzzy import numpy as np import tractor -from pydantic.dataclasses import dataclass import wsproto from .._cacheables import open_cached_client @@ -106,14 +105,14 @@ class Pair(Struct, frozen=True): permissions: list[str] -@dataclass -class OHLC: - """Description of the flattened OHLC quote format. +class OHLC(Struct): + ''' + Description of the flattened OHLC quote format. For schema details see: https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams - """ + ''' time: int open: float @@ -262,6 +261,7 @@ class Client: for i, bar in enumerate(bars): bar = OHLC(*bar) + bar.typecast() row = [] for j, (name, ftype) in enumerate(_ohlc_dtype[1:]): diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index b08d9f52..36b0199a 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -19,7 +19,6 @@ Kraken web API wrapping. ''' from contextlib import asynccontextmanager as acm -from dataclasses import field from datetime import datetime import itertools from typing import ( @@ -34,7 +33,6 @@ import pendulum import asks from fuzzywuzzy import process as fuzzy import numpy as np -from pydantic.dataclasses import dataclass import urllib.parse import hashlib import hmac @@ -78,31 +76,6 @@ _symbol_info_translation: dict[str, str] = { } -@dataclass -class OHLC: - ''' - Description of the flattened OHLC quote format. - - For schema details see: - https://docs.kraken.com/websockets/#message-ohlc - - ''' - chan_id: int # internal kraken id - chan_name: str # eg. ohlc-1 (name-interval) - pair: str # fx pair - time: float # Begin time of interval, in seconds since epoch - etime: float # End time of interval, in seconds since epoch - open: float # Open price of interval - high: float # High price within interval - low: float # Low price within interval - close: float # Close price of interval - vwap: float # Volume weighted average price within interval - volume: float # Accumulated volume **within interval** - count: int # Number of trades within interval - # (sampled) generated tick data - ticks: list[Any] = field(default_factory=list) - - def get_config() -> dict[str, Any]: conf, path = config.load() diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index e8cfd9b6..0f41a3ec 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -19,7 +19,6 @@ Real-time and historical data feed endpoints. ''' from contextlib import asynccontextmanager as acm -from dataclasses import asdict from datetime import datetime from typing import ( Any, @@ -50,7 +49,6 @@ from piker.data._web_bs import open_autorecon_ws, NoBsWs from . import log from .api import ( Client, - OHLC, ) @@ -88,6 +86,30 @@ class Pair(Struct): ordermin: float # minimum order volume for pair +class OHLC(Struct): + ''' + Description of the flattened OHLC quote format. + + For schema details see: + https://docs.kraken.com/websockets/#message-ohlc + + ''' + chan_id: int # internal kraken id + chan_name: str # eg. ohlc-1 (name-interval) + pair: str # fx pair + time: float # Begin time of interval, in seconds since epoch + etime: float # End time of interval, in seconds since epoch + open: float # Open price of interval + high: float # High price within interval + low: float # Low price within interval + close: float # Close price of interval + vwap: float # Volume weighted average price within interval + volume: float # Accumulated volume **within interval** + count: int # Number of trades within interval + # (sampled) generated tick data + ticks: list[Any] = [] + + async def stream_messages( ws: NoBsWs, ): @@ -176,12 +198,14 @@ async def process_data_feed_msgs( pair ]: if 'ohlc' in chan_name: - yield 'ohlc', OHLC( + ohlc = OHLC( chan_id, chan_name, pair, *payload_array[0] ) + ohlc.typecast() + yield 'ohlc', ohlc elif 'spread' in chan_name: @@ -214,7 +238,7 @@ def normalize( ohlc: OHLC, ) -> dict: - quote = asdict(ohlc) + quote = ohlc.to_dict() quote['broker_ts'] = quote['time'] quote['brokerd_ts'] = time.time() quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') diff --git a/piker/data/types.py b/piker/data/types.py index c6cba61d..d8926610 100644 --- a/piker/data/types.py +++ b/piker/data/types.py @@ -66,3 +66,10 @@ class Struct( ).decode( msgspec.msgpack.Encoder().encode(self) ) + + def typecast( + self, + # fields: Optional[list[str]] = None, + ) -> None: + for fname, ftype in self.__annotations__.items(): + setattr(self, fname, ftype(getattr(self, fname)))