POC using paper-in-docker gw for symbol search

livenpaper
Tyler Goodlet 2022-07-21 10:30:23 -04:00
parent 38b190e598
commit 848e345364
1 changed files with 30 additions and 14 deletions

View File

@ -577,7 +577,6 @@ def normalize(
# check for special contract types # check for special contract types
con = ticker.contract con = ticker.contract
fqsn, calc_price = con2fqsn(con) fqsn, calc_price = con2fqsn(con)
# convert named tuples to dicts so we send usable keys # convert named tuples to dicts so we send usable keys
@ -868,15 +867,24 @@ async def open_symbol_search(
# TODO: load user defined symbol set locally for fast search? # TODO: load user defined symbol set locally for fast search?
await ctx.started({}) await ctx.started({})
async with open_data_client() as proxy: # async with open_data_client() as proxy:
async with (
open_client_proxies() as (proxies, clients),
):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# await tractor.breakpoint()
proxy = proxies['ib.algopaper']
last = time.time() last = time.time()
async for pattern in stream: async for pattern in stream:
log.debug(f'received {pattern}') log.info(f'received {pattern}')
now = time.time() now = time.time()
# this causes tractor hang...
# assert 0
assert pattern, 'IB can not accept blank search pattern' assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz # throttle search requests to no faster then 1Hz
@ -904,7 +912,7 @@ async def open_symbol_search(
continue continue
log.debug(f'searching for {pattern}') log.info(f'searching for {pattern}')
last = time.time() last = time.time()
@ -915,6 +923,8 @@ async def open_symbol_search(
async def stash_results(target: Awaitable[list]): async def stash_results(target: Awaitable[list]):
stock_results.extend(await target) stock_results.extend(await target)
for i in range(10):
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as sn: async with trio.open_nursery() as sn:
sn.start_soon( sn.start_soon(
stash_results, stash_results,
@ -927,6 +937,12 @@ async def open_symbol_search(
# trigger async request # trigger async request
await trio.sleep(0) await trio.sleep(0)
if cs.cancelled_caught:
log.warning(f'Search timeout? {proxy._aio_ns.ib.client}')
continue
else:
break
# # match against our ad-hoc set immediately # # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extractBests( # adhoc_matches = fuzzy.extractBests(
# pattern, # pattern,