ib: add `.pause()` on symbol query overruns to aid in fixing the issue

distribute_dis
Tyler Goodlet 2023-12-04 13:10:15 -05:00
parent 22bd83943b
commit 9245d24b47
3 changed files with 11 additions and 7 deletions

View File

@ -229,8 +229,8 @@ _samplings: dict[int, tuple[str, str]] = {
# throughput can be made faster during backfilling. # throughput can be made faster during backfilling.
60: ( 60: (
'1 min', '1 min',
'1 D', '2 D',
pendulum.duration(days=1), pendulum.duration(days=2),
), ),
} }

View File

@ -818,7 +818,7 @@ async def stream_quotes(
details: ibis.ContractDetails details: ibis.ContractDetails
async with ( async with (
open_data_client() as proxy, open_data_client() as proxy,
trio.open_nursery() as tn, # trio.open_nursery() as tn,
): ):
mkt, details = await get_mkt_info( mkt, details = await get_mkt_info(
sym, sym,

View File

@ -214,7 +214,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
last = time.time() last = time.time()
async for pattern in stream: async for pattern in stream:
log.info(f'received {pattern}') log.info(f'received {pattern}')
now = time.time() now: float = time.time()
# this causes tractor hang... # this causes tractor hang...
# assert 0 # assert 0
@ -261,7 +261,9 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# defined adhoc symbol set. # defined adhoc symbol set.
stock_results = [] stock_results = []
async def stash_results(target: Awaitable[list]): async def extend_results(
target: Awaitable[list]
) -> None:
try: try:
results = await target results = await target
except tractor.trionics.Lagged: except tractor.trionics.Lagged:
@ -274,7 +276,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as sn: async with trio.open_nursery() as sn:
sn.start_soon( sn.start_soon(
stash_results, extend_results,
proxy.search_symbols( proxy.search_symbols(
pattern=pattern, pattern=pattern,
upto=5, upto=5,
@ -289,8 +291,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
f'Search timeout? {proxy._aio_ns.ib.client}' f'Search timeout? {proxy._aio_ns.ib.client}'
) )
continue continue
else: elif stock_results:
break break
# else:
await tractor.pause()
# # match against our ad-hoc set immediately # # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extract( # adhoc_matches = fuzzy.extract(