Drop `pandas` use in ib backend for history

Found an issue (that was predictably brushed aside XD) where the
`ib_insync.util.df()` helper was changing the timestamps on bars data to
be way off (probably a `pandas.Timestamp` timezone thing?).

Anyway, dropped all that (which will hopefully let us drop `pandas` as
a hard dep) and added a buncha timestamp checking as well as start/end
datetime return values using `pendulum` so that consumer code can know
which "slice" is output.

Also added some WIP code to work around "no history found" request
errors where instead now we try to increment backward another 200
seconds - not sure if this actually correct yet.
mkts_backup
Tyler Goodlet 2022-03-29 10:36:40 -04:00
parent 3c5a799e97
commit e7b1d77b08
1 changed files with 105 additions and 81 deletions

View File

@ -23,7 +23,7 @@ built on it) and thus actor aware API calls must be spawned with
"""
from contextlib import asynccontextmanager as acm
from dataclasses import asdict
from dataclasses import asdict, astuple
from datetime import datetime
from functools import partial
import itertools
@ -41,6 +41,7 @@ import platform
from random import randint
import time
import trio
from trio_typing import TaskStatus
import tractor
@ -60,7 +61,7 @@ import numpy as np
from .. import config
from ..log import get_logger, get_console_log
from ..data._source import from_df
from ..data._source import base_ohlc_dtype
from ..data._sharedmem import ShmArray
from ._util import SymbolNotFound, NoData
from ..clearing._messages import (
@ -229,6 +230,28 @@ _exch_skip_list = {
_enters = 0
def bars_to_np(bars: list) -> np.ndarray:
'''
Convert a "bars list thing" (``BarsList`` type from ibis)
into a numpy struct array.
'''
# TODO: maybe rewrite this faster with ``numba``
np_ready = []
for bardata in bars:
ts = bardata.date.timestamp()
t = astuple(bardata)[:7]
np_ready.append((ts, ) + t[1:7])
nparr = np.array(
np_ready,
dtype=base_ohlc_dtype,
)
assert nparr['time'][0] == bars[0].date.timestamp()
assert nparr['time'][-1] == bars[-1].date.timestamp()
return nparr
class Client:
'''
IB wrapped for our broker backend API.
@ -255,6 +278,7 @@ class Client:
async def bars(
self,
fqsn: str,
# EST in ISO 8601 format is required... below is EPOCH
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
end_dt: Union[datetime, str] = "",
@ -262,7 +286,6 @@ class Client:
sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder
) -> list[dict[str, Any]]:
'''
Retreive OHLCV bars for a fqsn over a range to the present.
@ -313,10 +336,8 @@ class Client:
# TODO: raise underlying error here
raise ValueError(f"No bars retreived for {fqsn}?")
# TODO: rewrite this faster with ``numba``
# convert to pandas dataframe:
df = ibis.util.df(bars)
return bars, from_df(df)
nparr = bars_to_np(bars)
return bars, nparr
async def con_deats(
self,
@ -1214,7 +1235,6 @@ async def open_client_proxy() -> MethodProxy:
code = getattr(err, 'code', None)
if code:
msg = err.message
# await tractor.breakpoint()
# TODO: retreive underlying ``ib_insync`` error?
if (
@ -1362,7 +1382,9 @@ async def get_bars(
proxy: MethodProxy,
fqsn: str,
end_dt: str = "",
# blank to start which tells ib to look up the latest datum
end_dt: str = '',
) -> (dict, np.ndarray):
'''
@ -1370,73 +1392,68 @@ async def get_bars(
a ``MethoProxy``.
'''
_err: Optional[Exception] = None
import pendulum
fails = 0
bars: Optional[list] = None
in_throttle: bool = False
first_dt: datetime = None
last_dt: datetime = None
async def get():
if end_dt:
last_dt = pendulum.from_timestamp(end_dt.timestamp())
for _ in range(10):
try:
bars, bars_array = await proxy.bars(
fqsn=fqsn,
end_dt=end_dt,
)
if bars_array is None:
raise SymbolNotFound(fqsn)
next_dt = bars[0].date
log.info(f'ib datetime {next_dt}')
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
return (bars, bars_array, next_dt), fails
last_dt = pendulum.from_timestamp(
bars[-1].date.timestamp())
in_throttle: bool = False
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(f'bars retreived for dts {first_dt}:{last_dt}')
for _ in range(10):
try:
return await get()
return (bars, bars_array, first_dt, last_dt), fails
except RequestError as err:
msg = err.message
# why do we always need to rebind this?
_err = err
# TODO: retreive underlying ``ib_insync`` error?
if err.code == 162:
# TODO: so this error is normally raised (it seems) if
# we try to retrieve history for a time range for which
# there is none. in that case we should not only report
# the "empty range" but also do a iteration on the time
# step for ``next_dt`` to see if we can pull older
# history.
if 'HMDS query returned no data' in err.message:
# means we hit some kind of historical "empty space"
# and further requests will need to decrement the
# start time dt in order to not receive a further
# error?
# OLDER: seem to always cause throttling despite low rps
# TODO: if there is not bars returned from the first
# query we need to manually calculate the next step
# back and convert to an expected datetime format.
# if not bars:
# raise
# try to decrement start point and look further back
next_dt = bars[0].date
log.info(f'ib datetime {next_dt}')
continue
elif 'No market data permissions for' in err.message:
# _err = err
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(f'Symbol: {fqsn}')
break
elif (
err.code == 162
and 'HMDS query returned no data' in err.message
):
# try to decrement start point and look further back
end_dt = last_dt = last_dt.subtract(seconds=2000)
log.warning(
f'No data found ending @ {end_dt}\n'
f'Starting another request for {end_dt}'
)
continue
else:
log.exception(
"Data query rate reached: Press `ctrl-alt-f`"
"in TWS"
)
print(_err)
# TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here
@ -1452,6 +1469,7 @@ async def get_bars(
fails += 1
continue
return None, None
# else: # throttle wasn't fixed so error out immediately
# raise _err
@ -1480,14 +1498,14 @@ async def open_history_client(
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(f'{end_dt}')
bars, bars_array, next_dt = out
bars, bars_array, first_dt, last_dt = out
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
return bars_array, next_dt
return bars_array, first_dt, last_dt
yield get_hist
@ -1503,7 +1521,7 @@ async def backfill_bars(
# case the shm size will be driven by user config and available sys
# memory.
# count: int = 120,
count: int = 22,
count: int = 36,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -1515,6 +1533,9 @@ async def backfill_bars(
https://github.com/pikers/piker/issues/128
'''
# last_dt1 = None
last_dt = None
with trio.CancelScope() as cs:
# async with open_history_client(fqsn) as proxy:
@ -1530,9 +1551,10 @@ async def backfill_bars(
if out is None:
raise RuntimeError("Could not pull currrent history?!")
(first_bars, bars_array, next_dt) = out
(first_bars, bars_array, first_dt, last_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
last_dt = first_dt
# write historical data to buffer
shm.push(bars_array)
@ -1542,7 +1564,7 @@ async def backfill_bars(
i = 0
while i < count:
out, fails = await get_bars(proxy, fqsn, end_dt=next_dt)
out, fails = await get_bars(proxy, fqsn, end_dt=first_dt)
if fails is None or fails > 1:
break
@ -1551,10 +1573,12 @@ async def backfill_bars(
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and
# only grab valid bars in the range
log.error(f"Can't grab bars starting at {next_dt}!?!?")
log.error(f"Can't grab bars starting at {first_dt}!?!?")
continue
bars, bars_array, next_dt = out
(first_bars, bars_array, first_dt, last_dt) = out
# last_dt1 = last_dt
# last_dt = first_dt
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
@ -1787,7 +1811,7 @@ async def stream_quotes(
# TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(6):
with trio.move_on_after(1):
contract, first_ticker, details = await _trio_run_client_method(
method='get_quote',
symbol=sym,