From 9245d24b4774c2cca4172920bbf2056010990b51 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Dec 2023 13:10:15 -0500 Subject: [PATCH] ib: add `.pause()` on symbol query overruns to aid in fixing the issue --- piker/brokers/ib/api.py | 4 ++-- piker/brokers/ib/feed.py | 2 +- piker/brokers/ib/symbols.py | 12 ++++++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 6ff44e6d..2d13541a 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -229,8 +229,8 @@ _samplings: dict[int, tuple[str, str]] = { # throughput can be made faster during backfilling. 60: ( '1 min', - '1 D', - pendulum.duration(days=1), + '2 D', + pendulum.duration(days=2), ), } diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 734ecc39..bf84cba4 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -818,7 +818,7 @@ async def stream_quotes( details: ibis.ContractDetails async with ( open_data_client() as proxy, - trio.open_nursery() as tn, + # trio.open_nursery() as tn, ): mkt, details = await get_mkt_info( sym, diff --git a/piker/brokers/ib/symbols.py b/piker/brokers/ib/symbols.py index 99b3a801..6fe86aa9 100644 --- a/piker/brokers/ib/symbols.py +++ b/piker/brokers/ib/symbols.py @@ -214,7 +214,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None: last = time.time() async for pattern in stream: log.info(f'received {pattern}') - now = time.time() + now: float = time.time() # this causes tractor hang... # assert 0 @@ -261,7 +261,9 @@ async def open_symbol_search(ctx: tractor.Context) -> None: # defined adhoc symbol set. stock_results = [] - async def stash_results(target: Awaitable[list]): + async def extend_results( + target: Awaitable[list] + ) -> None: try: results = await target except tractor.trionics.Lagged: @@ -274,7 +276,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None: with trio.move_on_after(3) as cs: async with trio.open_nursery() as sn: sn.start_soon( - stash_results, + extend_results, proxy.search_symbols( pattern=pattern, upto=5, @@ -289,8 +291,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None: f'Search timeout? {proxy._aio_ns.ib.client}' ) continue - else: + elif stock_results: break + # else: + await tractor.pause() # # match against our ad-hoc set immediately # adhoc_matches = fuzzy.extract(