Add data-reset-task global state var

Allows keeping mutex state around data reset requests which (if more
then one are sent) can cause a throttling condition where ib's servers
will get slower and slower to conduct a reconnect. With this you can
have multiple ongoing contract requests without hitting that issue and
we can go back to having a nice 3s timeout on the history queries before
activating the hack.
ib_1m_hist
Tyler Goodlet 2022-10-19 13:03:47 -04:00
parent c7f57b940c
commit 330d16262e
1 changed files with 30 additions and 5 deletions

View File

@ -254,6 +254,9 @@ async def wait_on_data_reset(
return False return False
_data_resetter_task: trio.Task | None = None
async def get_bars( async def get_bars(
proxy: MethodProxy, proxy: MethodProxy,
@ -264,7 +267,7 @@ async def get_bars(
end_dt: str = '', end_dt: str = '',
# TODO: make this more dynamic based on measured frame rx latency.. # TODO: make this more dynamic based on measured frame rx latency..
timeout: float = 1.5, # how long before we trigger a feed reset timeout: float = 3, # how long before we trigger a feed reset
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -274,13 +277,15 @@ async def get_bars(
a ``MethoProxy``. a ``MethoProxy``.
''' '''
data_cs: Optional[trio.CancelScope] = None global _data_resetter_task
result: Optional[tuple[
data_cs: trio.CancelScope | None = None
result: tuple[
ibis.objects.BarDataList, ibis.objects.BarDataList,
np.ndarray, np.ndarray,
datetime, datetime,
datetime, datetime,
]] = None ] | None = None
result_ready = trio.Event() result_ready = trio.Event()
async def query(): async def query():
@ -400,6 +405,10 @@ async def get_bars(
else: else:
raise raise
# TODO: make this global across all history task/requests
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
# start history request that we allow # start history request that we allow
@ -414,6 +423,14 @@ async def get_bars(
await result_ready.wait() await result_ready.wait()
break break
if _data_resetter_task:
# don't double invoke the reset hack if another
# requester task already has it covered.
continue
else:
_data_resetter_task = trio.lowlevel.current_task()
unset_resetter = True
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await nurse.start( data_cs, reset_done = await nurse.start(
partial( partial(
@ -425,6 +442,7 @@ async def get_bars(
# sync wait on reset to complete # sync wait on reset to complete
await reset_done.wait() await reset_done.wait()
_data_resetter_task = None if unset_resetter else _data_resetter_task
return result, data_cs is not None return result, data_cs is not None
@ -955,7 +973,14 @@ async def open_symbol_search(
except trio.WouldBlock: except trio.WouldBlock:
pass pass
if not pattern or pattern.isspace(): if (
not pattern
or pattern.isspace()
# XXX: not sure if this is a bad assumption but it
# seems to make search snappier?
or len(pattern) < 1
):
log.warning('empty pattern received, skipping..') log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client # TODO: *BUG* if nothing is returned here the client