Add data-reset-task global state var
Allows keeping mutex state around data reset requests which (if more then one are sent) can cause a throttling condition where ib's servers will get slower and slower to conduct a reconnect. With this you can have multiple ongoing contract requests without hitting that issue and we can go back to having a nice 3s timeout on the history queries before activating the hack.clears_table_events
parent
55856f5e8b
commit
41ffccc59e
|
@ -254,6 +254,9 @@ async def wait_on_data_reset(
|
|||
return False
|
||||
|
||||
|
||||
_data_resetter_task: trio.Task | None = None
|
||||
|
||||
|
||||
async def get_bars(
|
||||
|
||||
proxy: MethodProxy,
|
||||
|
@ -264,7 +267,7 @@ async def get_bars(
|
|||
end_dt: str = '',
|
||||
|
||||
# TODO: make this more dynamic based on measured frame rx latency..
|
||||
timeout: float = 1.5, # how long before we trigger a feed reset
|
||||
timeout: float = 3, # how long before we trigger a feed reset
|
||||
|
||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
|
@ -274,13 +277,15 @@ async def get_bars(
|
|||
a ``MethoProxy``.
|
||||
|
||||
'''
|
||||
data_cs: Optional[trio.CancelScope] = None
|
||||
result: Optional[tuple[
|
||||
global _data_resetter_task
|
||||
|
||||
data_cs: trio.CancelScope | None = None
|
||||
result: tuple[
|
||||
ibis.objects.BarDataList,
|
||||
np.ndarray,
|
||||
datetime,
|
||||
datetime,
|
||||
]] = None
|
||||
] | None = None
|
||||
result_ready = trio.Event()
|
||||
|
||||
async def query():
|
||||
|
@ -400,6 +405,10 @@ async def get_bars(
|
|||
else:
|
||||
raise
|
||||
|
||||
# TODO: make this global across all history task/requests
|
||||
# such that simultaneous symbol queries don't try data resettingn
|
||||
# too fast..
|
||||
unset_resetter: bool = False
|
||||
async with trio.open_nursery() as nurse:
|
||||
|
||||
# start history request that we allow
|
||||
|
@ -414,6 +423,14 @@ async def get_bars(
|
|||
await result_ready.wait()
|
||||
break
|
||||
|
||||
if _data_resetter_task:
|
||||
# don't double invoke the reset hack if another
|
||||
# requester task already has it covered.
|
||||
continue
|
||||
else:
|
||||
_data_resetter_task = trio.lowlevel.current_task()
|
||||
unset_resetter = True
|
||||
|
||||
# spawn new data reset task
|
||||
data_cs, reset_done = await nurse.start(
|
||||
partial(
|
||||
|
@ -425,6 +442,7 @@ async def get_bars(
|
|||
# sync wait on reset to complete
|
||||
await reset_done.wait()
|
||||
|
||||
_data_resetter_task = None if unset_resetter else _data_resetter_task
|
||||
return result, data_cs is not None
|
||||
|
||||
|
||||
|
@ -955,7 +973,14 @@ async def open_symbol_search(
|
|||
except trio.WouldBlock:
|
||||
pass
|
||||
|
||||
if not pattern or pattern.isspace():
|
||||
if (
|
||||
not pattern
|
||||
or pattern.isspace()
|
||||
|
||||
# XXX: not sure if this is a bad assumption but it
|
||||
# seems to make search snappier?
|
||||
or len(pattern) < 1
|
||||
):
|
||||
log.warning('empty pattern received, skipping..')
|
||||
|
||||
# TODO: *BUG* if nothing is returned here the client
|
||||
|
|
Loading…
Reference in New Issue