From 848e3453641a1f60933bc54d9279a34203b3dc51 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 21 Jul 2022 10:30:23 -0400 Subject: [PATCH] POC using paper-in-docker gw for symbol search --- piker/brokers/ib/feed.py | 44 +++++++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 294ebc99..85634c18 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -577,7 +577,6 @@ def normalize( # check for special contract types con = ticker.contract - fqsn, calc_price = con2fqsn(con) # convert named tuples to dicts so we send usable keys @@ -868,15 +867,24 @@ async def open_symbol_search( # TODO: load user defined symbol set locally for fast search? await ctx.started({}) - async with open_data_client() as proxy: + # async with open_data_client() as proxy: + async with ( + open_client_proxies() as (proxies, clients), + ): async with ctx.open_stream() as stream: + # await tractor.breakpoint() + proxy = proxies['ib.algopaper'] + last = time.time() async for pattern in stream: - log.debug(f'received {pattern}') + log.info(f'received {pattern}') now = time.time() + # this causes tractor hang... + # assert 0 + assert pattern, 'IB can not accept blank search pattern' # throttle search requests to no faster then 1Hz @@ -904,7 +912,7 @@ async def open_symbol_search( continue - log.debug(f'searching for {pattern}') + log.info(f'searching for {pattern}') last = time.time() @@ -915,17 +923,25 @@ async def open_symbol_search( async def stash_results(target: Awaitable[list]): stock_results.extend(await target) - async with trio.open_nursery() as sn: - sn.start_soon( - stash_results, - proxy.search_symbols( - pattern=pattern, - upto=5, - ), - ) + for i in range(10): + with trio.move_on_after(3) as cs: + async with trio.open_nursery() as sn: + sn.start_soon( + stash_results, + proxy.search_symbols( + pattern=pattern, + upto=5, + ), + ) - # trigger async request - await trio.sleep(0) + # trigger async request + await trio.sleep(0) + + if cs.cancelled_caught: + log.warning(f'Search timeout? {proxy._aio_ns.ib.client}') + continue + else: + break # # match against our ad-hoc set immediately # adhoc_matches = fuzzy.extractBests(