ib: fix mega borked hist queries on gappy assets

Explains why stuff always seemed wrong before XD

Previously whenever a time-gappy asset (like a stock due to it's venue
operating hours) was being loaded, we weren't querying for a "durations
worth" of bars and this was causing all sorts of actual gaps in our
data set that shouldn't exist..

Fix that by always attempting to retrieve a min aggregate-time's
worth/duration of bars/datums in the history manager. Actually,
i implemented this in both the feed and api layers for this backend
since it doesn't seem to strictly work just implementing it at the
`Client.bars()` level, not sure why but..

Also, buncha `ruff` linting cleanups and fix the logger nameeee, lel.
basic_buy_bot
Tyler Goodlet 2023-06-08 19:34:34 -04:00
parent c8f8724887
commit 75ff3921b6
3 changed files with 71 additions and 31 deletions

View File

@ -21,22 +21,14 @@ runnable script-programs.
'''
from __future__ import annotations
from functools import partial
from typing import (
Literal,
TYPE_CHECKING,
)
from typing import Literal
import subprocess
import tractor
from .._util import log
if TYPE_CHECKING:
from .api import (
MethodProxy,
ib_Client
)
from .._util import get_logger
log = get_logger('piker.brokers.ib')
_reset_tech: Literal[
'vnc',

View File

@ -423,7 +423,7 @@ class Client:
# optional "duration of time" equal to the
# length of the returned history frame.
duration: Optional[str] = None,
duration: str | None = None,
**kwargs,
@ -475,6 +475,8 @@ class Client:
# whatToShow='MIDPOINT',
# whatToShow='TRADES',
)
# tail case if no history for range or none prior.
if not bars:
# NOTE: there's 2 cases here to handle (and this should be
# read alongside the implementation of
@ -489,6 +491,32 @@ class Client:
# rewrite the method in the first case? right now there's no
# way to detect a timeout.
# NOTE XXX: ensure minimum duration in bars B)
# => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
elif (
end_dt
and (
(len(bars) * sample_period_s) < dt_duration.in_seconds()
)
):
log.warning(
f'Recursing to get more bars from {end_dt} for {dt_duration}'
)
end_dt -= dt_duration
(
r_bars,
r_arr,
r_duration,
) = await self.bars(
fqme,
start_dt=start_dt,
end_dt=end_dt,
)
r_bars.extend(bars)
bars = r_bars
nparr = bars_to_np(bars)
return bars, nparr, dt_duration
@ -921,7 +949,7 @@ class Client:
done, pending = await asyncio.wait(
[ready],
timeout=0.1,
timeout=0.01,
)
if ready in done:
break
@ -1401,7 +1429,7 @@ async def open_client_proxies() -> tuple[
# TODO: maybe this should be the default in tractor?
key=tractor.current_actor().uid,
) as (cache_hit, (clients, from_aio)),
) as (cache_hit, (clients, _)),
AsyncExitStack() as stack
):

View File

@ -30,8 +30,8 @@ from functools import partial
from math import isnan
import time
from typing import (
Any,
Callable,
Optional,
Awaitable,
)
@ -180,8 +180,8 @@ async def open_history_client(
async def get_hist(
timeframe: float,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count
@ -192,6 +192,7 @@ async def open_history_client(
fqme,
timeframe,
end_dt=end_dt,
start_dt=start_dt,
)
latency = time.time() - query_start
if (
@ -325,6 +326,7 @@ async def wait_on_data_reset(
_data_resetter_task: trio.Task | None = None
_failed_resets: int = 0
async def get_bars(
proxy: MethodProxy,
@ -333,6 +335,7 @@ async def get_bars(
# blank to start which tells ib to look up the latest datum
end_dt: str = '',
start_dt: str | None = '',
# TODO: make this more dynamic based on measured frame rx latency?
# how long before we trigger a feed reset (seconds)
@ -387,15 +390,31 @@ async def get_bars(
bars, bars_array, dt_duration = out
# not enough bars signal, likely due to venue
# operational gaps.
too_little: bool = False
if (
not bars
and end_dt
):
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
end_dt
and (
not bars
or (too_little :=
start_dt
and (len(bars) * timeframe)
< dt_duration.in_seconds()
)
)
end_dt -= dt_duration
continue
):
if (
end_dt
or too_little
):
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt -= dt_duration
continue
raise NoData(f'{end_dt}')
if bars_array is None:
raise SymbolNotFound(fqme)
@ -544,6 +563,7 @@ async def get_bars(
await reset_done.wait()
_data_resetter_task = None if unset_resetter else _data_resetter_task
assert result
return result, data_cs is not None
@ -602,13 +622,12 @@ async def _setup_quote_stream(
'''
global _quote_streams
to_trio.send_nowait(None)
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
to_trio.send_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# NOTE: it's batch-wise and slow af but I guess could
@ -700,7 +719,9 @@ async def open_aio_quote_stream(
symbol=symbol,
contract=contract,
) as (first, from_aio):
) as (contract, from_aio):
assert contract
# cache feed for later consumers
_quote_streams[symbol] = from_aio
@ -783,7 +804,6 @@ async def get_mkt_info(
# bs_fqme, _, broker = fqme.partition('.')
proxy: MethodProxy
get_details: bool = False
if proxy is not None:
client_ctx = nullcontext(proxy)
else:
@ -800,7 +820,6 @@ async def get_mkt_info(
raise
# TODO: more consistent field translation
init_info: dict = {}
atype = _asset_type_map[con.secType]
if atype == 'commodity':
@ -912,7 +931,8 @@ async def stream_quotes(
con: Contract = details.contract
first_ticker: Ticker = await proxy.get_quote(contract=con)
first_quote: dict = normalize(first_ticker)
log.runtime(f'FIRST QUOTE: {first_quote}')
log.warning(f'FIRST QUOTE: {first_quote}')
# TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff.
@ -1045,7 +1065,7 @@ async def open_symbol_search(
await ctx.started({})
async with (
open_client_proxies() as (proxies, clients),
open_client_proxies() as (proxies, _),
open_data_client() as data_proxy,
):
async with ctx.open_stream() as stream: