POC using paper-in-docker gw for symbol search

ib_native_data_hack
Tyler Goodlet 2022-07-21 10:30:23 -04:00
parent a741ed3161
commit f26c399ad3
1 changed files with 30 additions and 14 deletions

View File

@ -576,7 +576,6 @@ def normalize(
# check for special contract types # check for special contract types
con = ticker.contract con = ticker.contract
fqsn, calc_price = con2fqsn(con) fqsn, calc_price = con2fqsn(con)
# convert named tuples to dicts so we send usable keys # convert named tuples to dicts so we send usable keys
@ -866,15 +865,24 @@ async def open_symbol_search(
# TODO: load user defined symbol set locally for fast search? # TODO: load user defined symbol set locally for fast search?
await ctx.started({}) await ctx.started({})
async with open_data_client() as proxy: # async with open_data_client() as proxy:
async with (
open_client_proxies() as (proxies, clients),
):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# await tractor.breakpoint()
proxy = proxies['ib.algopaper']
last = time.time() last = time.time()
async for pattern in stream: async for pattern in stream:
log.debug(f'received {pattern}') log.info(f'received {pattern}')
now = time.time() now = time.time()
# this causes tractor hang...
# assert 0
assert pattern, 'IB can not accept blank search pattern' assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz # throttle search requests to no faster then 1Hz
@ -902,7 +910,7 @@ async def open_symbol_search(
continue continue
log.debug(f'searching for {pattern}') log.info(f'searching for {pattern}')
last = time.time() last = time.time()
@ -913,17 +921,25 @@ async def open_symbol_search(
async def stash_results(target: Awaitable[list]): async def stash_results(target: Awaitable[list]):
stock_results.extend(await target) stock_results.extend(await target)
async with trio.open_nursery() as sn: for i in range(10):
sn.start_soon( with trio.move_on_after(3) as cs:
stash_results, async with trio.open_nursery() as sn:
proxy.search_symbols( sn.start_soon(
pattern=pattern, stash_results,
upto=5, proxy.search_symbols(
), pattern=pattern,
) upto=5,
),
)
# trigger async request # trigger async request
await trio.sleep(0) await trio.sleep(0)
if cs.cancelled_caught:
log.warning(f'Search timeout? {proxy._aio_ns.ib.client}')
continue
else:
break
# # match against our ad-hoc set immediately # # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extractBests( # adhoc_matches = fuzzy.extractBests(