Add symbol searching for ib backend

Obviously this only supports stocks to start, it looks like we might
actually have to hard code some of the futures/forex/cmdtys that don't
have a search.. so lame. Special throttling is added here since the api
will grog out at anything more then 1Hz.

Additionally, decouple the bar loading request error handling from the
shm pushing loop so that we can always recover from a historical bars
throttle-error even if it's on the first try for a new symbol.
symbol_search
Tyler Goodlet 2021-05-12 08:32:15 -04:00
parent 25d7122cb6
commit e5e9a7c582
1 changed files with 120 additions and 63 deletions

View File

@ -46,8 +46,8 @@ import ib_insync as ibis
from ib_insync.wrapper import Wrapper from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client from ib_insync.client import Client as ib_Client
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np
from .api import open_cached_client
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df from ..data._source import from_df
@ -143,11 +143,21 @@ class NonShittyIB(ibis.IB):
# map of symbols to contract ids # map of symbols to contract ids
_adhoc_cmdty_data_map = { _adhoc_cmdty_data_map = {
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
# NOTE: cmdtys don't have trade data:
# NOTE: some cmdtys/metals don't have trade data like gold/usd:
# https://groups.io/g/twsapi/message/44174 # https://groups.io/g/twsapi/message/44174
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
} }
_adhoc_futes_set = {
'nq.globex',
'mnq.globex',
'es.globex',
'mes.globex',
}
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
_enters = 0 _enters = 0
@ -650,6 +660,8 @@ async def _aio_run_client_method(
if to_trio and 'to_trio' in args: if to_trio and 'to_trio' in args:
kwargs['to_trio'] = to_trio kwargs['to_trio'] = to_trio
log.runtime(f'Running {meth}({kwargs})')
return await async_meth(**kwargs) return await async_meth(**kwargs)
@ -786,13 +798,64 @@ def normalize(
return data return data
async def get_bars(
sym: str,
end_dt: str = "",
) -> (dict, np.ndarray):
_err = None
for _ in range(1):
try:
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
end_dt=end_dt,
)
if bars_array is None:
raise SymbolNotFound(sym)
next_dt = bars[0].date
return bars, bars_array, next_dt
except RequestError as err:
_err = err
# TODO: retreive underlying ``ib_insync`` error?
if err.code == 162:
if 'HMDS query returned no data' in err.message:
# means we hit some kind of historical "dead zone"
# and further requests seem to always cause
# throttling despite the rps being low
break
else:
log.exception(
"Data query rate reached: Press `ctrl-alt-f`"
"in TWS"
)
# TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here
# that restarts/resumes this task?
await tractor.breakpoint()
else: # throttle wasn't fixed so error out immediately
raise _err
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
# count: int = 20, # NOTE: any more and we'll overrun underlying buffer # count: int = 20, # NOTE: any more and we'll overrun underlying buffer
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer count: int = 6, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. """Fill historical bars into shared mem / storage afap.
@ -800,10 +863,7 @@ async def backfill_bars(
https://github.com/pikers/piker/issues/128 https://github.com/pikers/piker/issues/128
""" """
first_bars, bars_array = await _trio_run_client_method( first_bars, bars_array, next_dt = await get_bars(sym)
method='bars',
symbol=sym,
)
# write historical data to buffer # write historical data to buffer
shm.push(bars_array) shm.push(bars_array)
@ -812,46 +872,12 @@ async def backfill_bars(
task_status.started(cs) task_status.started(cs)
next_dt = first_bars[0].date
i = 0 i = 0
while i < count: while i < count:
try: bars, bars_array, next_dt = await get_bars(sym, end_dt=next_dt)
bars, bars_array = await _trio_run_client_method( shm.push(bars_array, prepend=True)
method='bars', i += 1
symbol=sym,
end_dt=next_dt,
)
if bars_array is None:
raise SymbolNotFound(sym)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars[0].date
except RequestError as err:
# TODO: retreive underlying ``ib_insync`` error?
if err.code == 162:
if 'HMDS query returned no data' in err.message:
# means we hit some kind of historical "dead zone"
# and further requests seem to always cause
# throttling despite the rps being low
break
else:
log.exception(
"Data query rate reached: Press `ctrl-alt-f`"
"in TWS"
)
# TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here
# that restarts/resumes this task?
await tractor.breakpoint()
asset_type_map = { asset_type_map = {
@ -1201,28 +1227,59 @@ async def stream_trades(
@tractor.context @tractor.context
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> Client: ) -> None:
async with open_cached_client('ib') as client: # async with open_cached_client('ib') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
await ctx.started({}) await ctx.started({})
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for pattern in stream: last = time.time()
if not pattern: async for pattern in stream:
# will get error on empty request log.debug(f'received {pattern}')
continue now = time.time()
results = await client.search_stocks(pattern=pattern, upto=5) assert pattern, 'IB can not accept blank search pattern'
matches = fuzzy.extractBests( # throttle search requests to no faster then 1Hz
pattern, diff = now - last
results, if diff < 1.0:
score_cutoff=50, log.debug('throttle sleeping')
) await trio.sleep(diff)
await stream.send( try:
{item[2]: item[0] pattern = stream.receive_nowait()
for item in matches} # if new:
) # pattern = new
except trio.WouldBlock:
pass
log.debug(f'searching for {pattern}')
# await tractor.breakpoint()
last = time.time()
results = await _trio_run_client_method(
method='search_stocks',
pattern=pattern,
upto=5,
)
log.debug(f'got results {results.keys()}')
# results = await client.search_stocks(
# pattern=pattern, upto=5)
# if cs.cancelled_caught:
# print(f'timed out search for {pattern} !?')
# # await tractor.breakpoint()
# await stream.send({})
# continue
log.debug("fuzzy matching")
matches = fuzzy.extractBests(
pattern,
results,
score_cutoff=50,
)
matches = {item[2]: item[0] for item in matches}
log.debug(f"sending matches: {matches.keys()}")
await stream.send(matches)