From 330d16262ecc7513978d9a838b8f46bc59a5fff0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 19 Oct 2022 13:03:47 -0400 Subject: [PATCH] Add data-reset-task global state var Allows keeping mutex state around data reset requests which (if more then one are sent) can cause a throttling condition where ib's servers will get slower and slower to conduct a reconnect. With this you can have multiple ongoing contract requests without hitting that issue and we can go back to having a nice 3s timeout on the history queries before activating the hack. --- piker/brokers/ib/feed.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 47b7ed25..957b6249 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -254,6 +254,9 @@ async def wait_on_data_reset( return False +_data_resetter_task: trio.Task | None = None + + async def get_bars( proxy: MethodProxy, @@ -264,7 +267,7 @@ async def get_bars( end_dt: str = '', # TODO: make this more dynamic based on measured frame rx latency.. - timeout: float = 1.5, # how long before we trigger a feed reset + timeout: float = 3, # how long before we trigger a feed reset task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -274,13 +277,15 @@ async def get_bars( a ``MethoProxy``. ''' - data_cs: Optional[trio.CancelScope] = None - result: Optional[tuple[ + global _data_resetter_task + + data_cs: trio.CancelScope | None = None + result: tuple[ ibis.objects.BarDataList, np.ndarray, datetime, datetime, - ]] = None + ] | None = None result_ready = trio.Event() async def query(): @@ -400,6 +405,10 @@ async def get_bars( else: raise + # TODO: make this global across all history task/requests + # such that simultaneous symbol queries don't try data resettingn + # too fast.. + unset_resetter: bool = False async with trio.open_nursery() as nurse: # start history request that we allow @@ -414,6 +423,14 @@ async def get_bars( await result_ready.wait() break + if _data_resetter_task: + # don't double invoke the reset hack if another + # requester task already has it covered. + continue + else: + _data_resetter_task = trio.lowlevel.current_task() + unset_resetter = True + # spawn new data reset task data_cs, reset_done = await nurse.start( partial( @@ -425,6 +442,7 @@ async def get_bars( # sync wait on reset to complete await reset_done.wait() + _data_resetter_task = None if unset_resetter else _data_resetter_task return result, data_cs is not None @@ -955,7 +973,14 @@ async def open_symbol_search( except trio.WouldBlock: pass - if not pattern or pattern.isspace(): + if ( + not pattern + or pattern.isspace() + + # XXX: not sure if this is a bad assumption but it + # seems to make search snappier? + or len(pattern) < 1 + ): log.warning('empty pattern received, skipping..') # TODO: *BUG* if nothing is returned here the client