diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 69b55dc1..48b28d6f 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -476,10 +476,12 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: @acm async def open_history_client( - symbol: str, + mkt: MktPair, ) -> tuple[Callable, int]: + symbol: str = mkt.bs_fqme + # TODO implement history getter for the new storage layer. async with open_cached_client('binance') as client: diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index deb0422f..a9420402 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -62,9 +62,10 @@ log = get_logger(__name__) @acm async def open_history_client( - instrument: str, + mkt: MktPair, ) -> tuple[Callable, int]: + fnstrument: str = mkt.bs_fqme # TODO implement history getter for the new storage layer. async with open_cached_client('deribit') as client: diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 0cc24464..526590fe 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -25,8 +25,9 @@ from contextlib import ( from datetime import datetime from typing import ( Any, - Optional, + AsyncGenerator, Callable, + Optional, ) import time @@ -214,9 +215,11 @@ def normalize( @acm async def open_history_client( - symbol: str, + mkt: MktPair, -) -> tuple[Callable, int]: +) -> AsyncGenerator[Callable, None]: + + symbol: str = mkt.bs_fqme # TODO implement history getter for the new storage layer. async with open_cached_client('kraken') as client: diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 1e6d2cd0..3f8b71d0 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -847,8 +847,12 @@ async def stream_messages( @acm async def open_history_client( - symbol: str, + mkt: MktPair, + ) -> AsyncGenerator[Callable, None]: + + symbol: str = mkt.bs_fqme + async with open_cached_client('kucoin') as client: log.info('Attempting to open kucoin history client') diff --git a/piker/data/history.py b/piker/data/history.py index 00cc019e..182408f3 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -39,7 +39,7 @@ import pendulum import numpy as np from .. import config -from ..accounting._mktinfo import ( +from ..accounting import ( MktPair, unpack_fqme, ) @@ -54,9 +54,6 @@ from ._sharedmem import ( ShmArray, _secs_in_day, ) -from ..accounting._mktinfo import ( - unpack_fqme, -) from ._source import base_iohlc_dtype from ._sampling import ( open_sample_stream, @@ -110,9 +107,8 @@ async def start_backfill( ] config: dict[str, int] - bs_fqme: str = mkt.bs_fqme async with mod.open_history_client( - bs_fqme, + mkt, ) as (hist, config): # get latest query's worth of history all the way @@ -143,7 +139,7 @@ async def start_backfill( surr = array[-6:] diff_in_mins = round(diff/60., ndigits=2) log.warning( - f'STEP ERROR `{bs_fqme}` for period {step_size_s}s:\n' + f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n' f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' 'Surrounding 6 time stamps:\n' f'{list(surr["time"])}\n' @@ -257,7 +253,7 @@ async def start_backfill( ): start_dt = min(starts) log.warning( - f"{bs_fqme}: skipping duplicate frame @ {next_start_dt}" + f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}" ) starts[start_dt] += 1 continue