Pass `mkt: MktPair` to `.open_history_client()`

Since porting all backends to the new `FeedInit` + `MktPair` + `Asset`
style init, we can now just directly pass a `MktPair` instance to the
history endpoint(s) since it's always called *after* the live feed
`.stream_quotes()` ep B)

This has a lot of benefits including allowing brokerd backends to have
more flexible, pre-processed market endpoint meta-data that piker has
already validated; makes handling special cases in much more straight
forward as well such as forex pairs from legacy brokers XD

First pass changes all crypto backends to expect this new input, ib will
come next after handling said special cases..
master
Tyler Goodlet 2023-05-17 16:52:15 -04:00
parent 89e8a834bf
commit 907eaa68cb
5 changed files with 20 additions and 14 deletions

View File

@ -476,10 +476,12 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
@acm @acm
async def open_history_client( async def open_history_client(
symbol: str, mkt: MktPair,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
symbol: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('binance') as client: async with open_cached_client('binance') as client:

View File

@ -62,9 +62,10 @@ log = get_logger(__name__)
@acm @acm
async def open_history_client( async def open_history_client(
instrument: str, mkt: MktPair,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:

View File

@ -25,8 +25,9 @@ from contextlib import (
from datetime import datetime from datetime import datetime
from typing import ( from typing import (
Any, Any,
Optional, AsyncGenerator,
Callable, Callable,
Optional,
) )
import time import time
@ -214,9 +215,11 @@ def normalize(
@acm @acm
async def open_history_client( async def open_history_client(
symbol: str, mkt: MktPair,
) -> tuple[Callable, int]: ) -> AsyncGenerator[Callable, None]:
symbol: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client:

View File

@ -847,8 +847,12 @@ async def stream_messages(
@acm @acm
async def open_history_client( async def open_history_client(
symbol: str, mkt: MktPair,
) -> AsyncGenerator[Callable, None]: ) -> AsyncGenerator[Callable, None]:
symbol: str = mkt.bs_fqme
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
log.info('Attempting to open kucoin history client') log.info('Attempting to open kucoin history client')

View File

@ -39,7 +39,7 @@ import pendulum
import numpy as np import numpy as np
from .. import config from .. import config
from ..accounting._mktinfo import ( from ..accounting import (
MktPair, MktPair,
unpack_fqme, unpack_fqme,
) )
@ -54,9 +54,6 @@ from ._sharedmem import (
ShmArray, ShmArray,
_secs_in_day, _secs_in_day,
) )
from ..accounting._mktinfo import (
unpack_fqme,
)
from ._source import base_iohlc_dtype from ._source import base_iohlc_dtype
from ._sampling import ( from ._sampling import (
open_sample_stream, open_sample_stream,
@ -110,9 +107,8 @@ async def start_backfill(
] ]
config: dict[str, int] config: dict[str, int]
bs_fqme: str = mkt.bs_fqme
async with mod.open_history_client( async with mod.open_history_client(
bs_fqme, mkt,
) as (hist, config): ) as (hist, config):
# get latest query's worth of history all the way # get latest query's worth of history all the way
@ -143,7 +139,7 @@ async def start_backfill(
surr = array[-6:] surr = array[-6:]
diff_in_mins = round(diff/60., ndigits=2) diff_in_mins = round(diff/60., ndigits=2)
log.warning( log.warning(
f'STEP ERROR `{bs_fqme}` for period {step_size_s}s:\n' f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n'
f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n'
'Surrounding 6 time stamps:\n' 'Surrounding 6 time stamps:\n'
f'{list(surr["time"])}\n' f'{list(surr["time"])}\n'
@ -257,7 +253,7 @@ async def start_backfill(
): ):
start_dt = min(starts) start_dt = min(starts)
log.warning( log.warning(
f"{bs_fqme}: skipping duplicate frame @ {next_start_dt}" f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
) )
starts[start_dt] += 1 starts[start_dt] += 1
continue continue