`binance`: always request an extra 1min OHLC bar

Seems that by default their history indexing rounds down/back to the
previous time step, so make sure we add a minute inside `Client.bars()`
when the `end_dt=None`, indicating "get the latest bar". Add
a breakpoint block that should trigger whenever the latest bar vs. the
latest epoch time is mismatched; we'll remove this after some testing
verifying the history bars issue is resolved.

Further this drops the legacy `backfill_bars()` endpoint which has been
deprecated and unused for a while.
agg_feedz
Tyler Goodlet 2023-01-05 12:44:58 -05:00
parent 76f920a16b
commit f6b7057b0d
1 changed files with 25 additions and 23 deletions

View File

@ -41,10 +41,15 @@ from ._util import (
SymbolNotFound, SymbolNotFound,
DataUnavailable, DataUnavailable,
) )
from ..log import get_logger, get_console_log from ..log import (
from ..data import ShmArray get_logger,
get_console_log,
)
from ..data.types import Struct from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs from ..data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -142,7 +147,9 @@ class OHLC(Struct):
# convert datetime obj timestamp to unixtime in milliseconds # convert datetime obj timestamp to unixtime in milliseconds
def binance_timestamp(when): def binance_timestamp(
when: datetime
) -> int:
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) return int((when.timestamp() * 1000) + (when.microsecond / 1000))
@ -238,7 +245,7 @@ class Client:
) -> dict: ) -> dict:
if end_dt is None: if end_dt is None:
end_dt = pendulum.now('UTC') end_dt = pendulum.now('UTC').add(minutes=1)
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
@ -396,8 +403,8 @@ async def open_history_client(
async def get_ohlc( async def get_ohlc(
timeframe: float, timeframe: float,
end_dt: Optional[datetime] = None, end_dt: datetime | None = None,
start_dt: Optional[datetime] = None, start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
@ -412,27 +419,22 @@ async def open_history_client(
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
start_dt = pendulum.from_timestamp(array[0]['time']) times = array['time']
end_dt = pendulum.from_timestamp(array[-1]['time']) if (
end_dt is None
):
inow = round(time.time())
if (inow - times[-1]) > 60:
await tractor.breakpoint()
start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1])
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
with trio.CancelScope() as cs:
async with open_cached_client('binance') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,