From 907eaa68cb992c3a57f341983135377e2220b430 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 May 2023 16:52:15 -0400 Subject: [PATCH] Pass `mkt: MktPair` to `.open_history_client()` Since porting all backends to the new `FeedInit` + `MktPair` + `Asset` style init, we can now just directly pass a `MktPair` instance to the history endpoint(s) since it's always called *after* the live feed `.stream_quotes()` ep B) This has a lot of benefits including allowing brokerd backends to have more flexible, pre-processed market endpoint meta-data that piker has already validated; makes handling special cases in much more straight forward as well such as forex pairs from legacy brokers XD First pass changes all crypto backends to expect this new input, ib will come next after handling said special cases.. --- piker/brokers/binance.py | 4 +++- piker/brokers/deribit/feed.py | 3 ++- piker/brokers/kraken/feed.py | 9 ++++++--- piker/brokers/kucoin.py | 6 +++++- piker/data/history.py | 12 ++++-------- 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 69b55dc1..48b28d6f 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -476,10 +476,12 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: @acm async def open_history_client( - symbol: str, + mkt: MktPair, ) -> tuple[Callable, int]: + symbol: str = mkt.bs_fqme + # TODO implement history getter for the new storage layer. async with open_cached_client('binance') as client: diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index deb0422f..a9420402 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -62,9 +62,10 @@ log = get_logger(__name__) @acm async def open_history_client( - instrument: str, + mkt: MktPair, ) -> tuple[Callable, int]: + fnstrument: str = mkt.bs_fqme # TODO implement history getter for the new storage layer. async with open_cached_client('deribit') as client: diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 0cc24464..526590fe 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -25,8 +25,9 @@ from contextlib import ( from datetime import datetime from typing import ( Any, - Optional, + AsyncGenerator, Callable, + Optional, ) import time @@ -214,9 +215,11 @@ def normalize( @acm async def open_history_client( - symbol: str, + mkt: MktPair, -) -> tuple[Callable, int]: +) -> AsyncGenerator[Callable, None]: + + symbol: str = mkt.bs_fqme # TODO implement history getter for the new storage layer. async with open_cached_client('kraken') as client: diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 1e6d2cd0..3f8b71d0 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -847,8 +847,12 @@ async def stream_messages( @acm async def open_history_client( - symbol: str, + mkt: MktPair, + ) -> AsyncGenerator[Callable, None]: + + symbol: str = mkt.bs_fqme + async with open_cached_client('kucoin') as client: log.info('Attempting to open kucoin history client') diff --git a/piker/data/history.py b/piker/data/history.py index 00cc019e..182408f3 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -39,7 +39,7 @@ import pendulum import numpy as np from .. import config -from ..accounting._mktinfo import ( +from ..accounting import ( MktPair, unpack_fqme, ) @@ -54,9 +54,6 @@ from ._sharedmem import ( ShmArray, _secs_in_day, ) -from ..accounting._mktinfo import ( - unpack_fqme, -) from ._source import base_iohlc_dtype from ._sampling import ( open_sample_stream, @@ -110,9 +107,8 @@ async def start_backfill( ] config: dict[str, int] - bs_fqme: str = mkt.bs_fqme async with mod.open_history_client( - bs_fqme, + mkt, ) as (hist, config): # get latest query's worth of history all the way @@ -143,7 +139,7 @@ async def start_backfill( surr = array[-6:] diff_in_mins = round(diff/60., ndigits=2) log.warning( - f'STEP ERROR `{bs_fqme}` for period {step_size_s}s:\n' + f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n' f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' 'Surrounding 6 time stamps:\n' f'{list(surr["time"])}\n' @@ -257,7 +253,7 @@ async def start_backfill( ): start_dt = min(starts) log.warning( - f"{bs_fqme}: skipping duplicate frame @ {next_start_dt}" + f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}" ) starts[start_dt] += 1 continue