Return history-frame duration from `.bars()`

This allows the history manager to know the decrement size for
`end_dt: datetime` on the next query if a no-data / gap case was
encountered; subtract this in `get_bars()` in such cases. Define the
expected `pendulum.Duration`s in the `.api._samplings` table.

Also add a bit of query latency profiling that we may use later to more
dynamically determine timeout driven data feed resets. Factor the `162`
error cases into a common exception handler block.
clears_table_events
Tyler Goodlet 2022-09-29 14:32:25 -04:00
parent 13e886c967
commit 96f5a8abb8
2 changed files with 71 additions and 52 deletions

View File

@ -43,6 +43,7 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor import to_asyncio from tractor import to_asyncio
import pendulum
import ib_insync as ibis import ib_insync as ibis
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
@ -52,6 +53,7 @@ from ib_insync.contract import (
from ib_insync.order import Order from ib_insync.order import Order
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
from ib_insync.objects import ( from ib_insync.objects import (
BarDataList,
Position, Position,
Fill, Fill,
Execution, Execution,
@ -248,7 +250,7 @@ _enters = 0
def bars_to_np(bars: list) -> np.ndarray: def bars_to_np(bars: list) -> np.ndarray:
''' '''
Convert a "bars list thing" (``BarsList`` type from ibis) Convert a "bars list thing" (``BarDataList`` type from ibis)
into a numpy struct array. into a numpy struct array.
''' '''
@ -274,10 +276,18 @@ def bars_to_np(bars: list) -> np.ndarray:
# but they say "use with discretion": # but they say "use with discretion":
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd # https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd
_samplings: dict[int, tuple[str, str]] = { _samplings: dict[int, tuple[str, str]] = {
1: ('1 secs', f'{int(2e3)} S'), 1: (
'1 secs',
f'{int(2e3)} S',
pendulum.duration(seconds=2e3),
),
# TODO: benchmark >1 D duration on query to see if # TODO: benchmark >1 D duration on query to see if
# throughput can be made faster during backfilling. # throughput can be made faster during backfilling.
60: ('1 min', '1 D'), 60: (
'1 min',
'1 D',
pendulum.duration(days=1),
),
} }
@ -344,7 +354,7 @@ class Client:
**kwargs, **kwargs,
) -> list[dict[str, Any]]: ) -> tuple[BarDataList, np.ndarray, pendulum.Duration]:
''' '''
Retreive OHLCV bars for a fqsn over a range to the present. Retreive OHLCV bars for a fqsn over a range to the present.
@ -353,7 +363,7 @@ class Client:
# https://interactivebrokers.github.io/tws-api/historical_data.html # https://interactivebrokers.github.io/tws-api/historical_data.html
bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs = {'whatToShow': 'TRADES'}
bars_kwargs.update(kwargs) bars_kwargs.update(kwargs)
bar_size, duration = _samplings[sample_period_s] bar_size, duration, dt_duration = _samplings[sample_period_s]
global _enters global _enters
# log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}')
@ -408,7 +418,7 @@ class Client:
# way to detect a timeout. # way to detect a timeout.
nparr = bars_to_np(bars) nparr = bars_to_np(bars)
return bars, nparr return bars, nparr, dt_duration
async def con_deats( async def con_deats(
self, self,

View File

@ -122,19 +122,36 @@ async def open_history_client(
async with open_data_client() as proxy: async with open_data_client() as proxy:
max_timeout: float = 2.
mean: float = 0
count: int = 0
async def get_hist( async def get_hist(
timeframe: float, timeframe: float,
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None,
) -> tuple[np.ndarray, str]: ) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count
out = await get_bars( query_start = time.time()
out, timedout = await get_bars(
proxy, proxy,
symbol, symbol,
timeframe, timeframe,
end_dt=end_dt, end_dt=end_dt,
) )
latency = time.time() - query_start
if (
not timedout
# and latency <= max_timeout
):
count += 1
mean += latency / count
print(
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
f'mean: {mean}'
)
if out is None: if out is None:
# could be trying to retreive bars over weekend # could be trying to retreive bars over weekend
@ -245,6 +262,8 @@ async def get_bars(
# blank to start which tells ib to look up the latest datum # blank to start which tells ib to look up the latest datum
end_dt: str = '', end_dt: str = '',
# TODO: make this more dynamic based on measured frame rx latency..
timeout: float = 1.5, # how long before we trigger a feed reset timeout: float = 1.5, # how long before we trigger a feed reset
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -280,17 +299,15 @@ async def get_bars(
# timeout=timeout, # timeout=timeout,
) )
if out is None: if out is None:
raise NoData( raise NoData(f'{end_dt}')
f'{end_dt}',
# frame_size=2000,
)
bars, bars_array = out bars, bars_array, dt_duration = out
if not bars: if not bars:
# TODO: duration lookup for this log.warning(
end_dt = end_dt.subtract(days=1) f'History is blank for {dt_duration} from {end_dt}'
print("SUBTRACTING DAY") )
end_dt = end_dt.subtract(dt_duration)
continue continue
if bars_array is None: if bars_array is None:
@ -328,42 +345,35 @@ async def get_bars(
f'Symbol: {fqsn}', f'Symbol: {fqsn}',
) )
elif ( elif err.code == 162:
err.code == 162 and if 'HMDS query returned no data' in err.message:
'HMDS query returned no data' in err.message # XXX: this is now done in the storage mgmt
): # layer and we shouldn't implicitly decrement
# XXX: this is now done in the storage mgmt layer # the frame dt index since the upper layer may
# and we shouldn't implicitly decrement the frame dt # be doing so concurrently and we don't want to
# index since the upper layer may be doing so # be delivering frames that weren't asked for.
# concurrently and we don't want to be delivering frames log.warning(
# that weren't asked for. f'NO DATA found ending @ {end_dt}\n'
log.warning( )
f'NO DATA found ending @ {end_dt}\n'
)
# try to decrement start point and look further back # try to decrement start point and look further back
# end_dt = end_dt.subtract(seconds=2000) # end_dt = end_dt.subtract(seconds=2000)
end_dt = end_dt.subtract(days=1) end_dt = end_dt.subtract(days=1)
print("SUBTRACTING DAY") print("SUBTRACTING DAY")
continue continue
elif ( elif 'API historical data query cancelled' in err.message:
err.code == 162 and log.warning(
'API historical data query cancelled' in err.message 'Query cancelled by IB (:eyeroll:):\n'
): f'{err.message}'
log.warning( )
'Query cancelled by IB (:eyeroll:):\n' continue
f'{err.message}' elif (
) 'Trading TWS session is connected from a different IP'
continue in err.message
):
# elif ( log.warning("ignoring ip address warning")
# err.code == 162 and continue
# 'Trading TWS session is connected from a different IP
# address' in err.message
# ):
# log.warning("ignoring ip address warning")
# continue
# XXX: more or less same as above timeout case # XXX: more or less same as above timeout case
elif _pacing in msg: elif _pacing in msg:
@ -402,7 +412,7 @@ async def get_bars(
with trio.move_on_after(timeout): with trio.move_on_after(timeout):
await result_ready.wait() await result_ready.wait()
continue break
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await nurse.start( data_cs, reset_done = await nurse.start(
@ -410,13 +420,12 @@ async def get_bars(
wait_on_data_reset, wait_on_data_reset,
proxy, proxy,
timeout=float('inf'), timeout=float('inf'),
# timeout=timeout,
) )
) )
# sync wait on reset to complete # sync wait on reset to complete
await reset_done.wait() await reset_done.wait()
return result return result, data_cs is not None
asset_type_map = { asset_type_map = {