From f6b7057b0d753ee47d93f60688ec3e67574ac9f3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 5 Jan 2023 12:44:58 -0500 Subject: [PATCH] `binance`: always request an extra 1min OHLC bar Seems that by default their history indexing rounds down/back to the previous time step, so make sure we add a minute inside `Client.bars()` when the `end_dt=None`, indicating "get the latest bar". Add a breakpoint block that should trigger whenever the latest bar vs. the latest epoch time is mismatched; we'll remove this after some testing verifying the history bars issue is resolved. Further this drops the legacy `backfill_bars()` endpoint which has been deprecated and unused for a while. --- piker/brokers/binance.py | 48 +++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index be3f35cf..5ea7860a 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -41,10 +41,15 @@ from ._util import ( SymbolNotFound, DataUnavailable, ) -from ..log import get_logger, get_console_log -from ..data import ShmArray +from ..log import ( + get_logger, + get_console_log, +) from ..data.types import Struct -from ..data._web_bs import open_autorecon_ws, NoBsWs +from ..data._web_bs import ( + open_autorecon_ws, + NoBsWs, +) log = get_logger(__name__) @@ -142,7 +147,9 @@ class OHLC(Struct): # convert datetime obj timestamp to unixtime in milliseconds -def binance_timestamp(when): +def binance_timestamp( + when: datetime +) -> int: return int((when.timestamp() * 1000) + (when.microsecond / 1000)) @@ -238,7 +245,7 @@ class Client: ) -> dict: if end_dt is None: - end_dt = pendulum.now('UTC') + end_dt = pendulum.now('UTC').add(minutes=1) if start_dt is None: start_dt = end_dt.start_of( @@ -396,8 +403,8 @@ async def open_history_client( async def get_ohlc( timeframe: float, - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, + end_dt: datetime | None = None, + start_dt: datetime | None = None, ) -> tuple[ np.ndarray, @@ -412,27 +419,22 @@ async def open_history_client( start_dt=start_dt, end_dt=end_dt, ) - start_dt = pendulum.from_timestamp(array[0]['time']) - end_dt = pendulum.from_timestamp(array[-1]['time']) + times = array['time'] + if ( + end_dt is None + ): + inow = round(time.time()) + if (inow - times[-1]) > 60: + await tractor.breakpoint() + + start_dt = pendulum.from_timestamp(times[0]) + end_dt = pendulum.from_timestamp(times[-1]) + return array, start_dt, end_dt yield get_ohlc, {'erlangs': 3, 'rate': 3} -async def backfill_bars( - sym: str, - shm: ShmArray, # type: ignore # noqa - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -) -> None: - """Fill historical bars into shared mem / storage afap. - """ - with trio.CancelScope() as cs: - async with open_cached_client('binance') as client: - bars = await client.bars(symbol=sym) - shm.push(bars) - task_status.started(cs) - - async def stream_quotes( send_chan: trio.abc.SendChannel,