diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 98c3a979..eea26b62 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -46,8 +46,8 @@ import ib_insync as ibis from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client from fuzzywuzzy import process as fuzzy +import numpy as np -from .api import open_cached_client from ..log import get_logger, get_console_log from .._daemon import maybe_spawn_brokerd from ..data._source import from_df @@ -143,11 +143,21 @@ class NonShittyIB(ibis.IB): # map of symbols to contract ids _adhoc_cmdty_data_map = { # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 - # NOTE: cmdtys don't have trade data: + + # NOTE: some cmdtys/metals don't have trade data like gold/usd: # https://groups.io/g/twsapi/message/44174 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), } +_adhoc_futes_set = { + 'nq.globex', + 'mnq.globex', + 'es.globex', + 'mes.globex', +} + + # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 + _enters = 0 @@ -650,6 +660,8 @@ async def _aio_run_client_method( if to_trio and 'to_trio' in args: kwargs['to_trio'] = to_trio + log.runtime(f'Running {meth}({kwargs})') + return await async_meth(**kwargs) @@ -786,13 +798,64 @@ def normalize( return data +async def get_bars( + sym: str, + end_dt: str = "", +) -> (dict, np.ndarray): + + _err = None + + for _ in range(1): + try: + + bars, bars_array = await _trio_run_client_method( + method='bars', + symbol=sym, + end_dt=end_dt, + ) + + if bars_array is None: + raise SymbolNotFound(sym) + + next_dt = bars[0].date + + return bars, bars_array, next_dt + + except RequestError as err: + _err = err + + # TODO: retreive underlying ``ib_insync`` error? + if err.code == 162: + + if 'HMDS query returned no data' in err.message: + # means we hit some kind of historical "dead zone" + # and further requests seem to always cause + # throttling despite the rps being low + break + + else: + log.exception( + "Data query rate reached: Press `ctrl-alt-f`" + "in TWS" + ) + + # TODO: should probably create some alert on screen + # and then somehow get that to trigger an event here + # that restarts/resumes this task? + await tractor.breakpoint() + + else: # throttle wasn't fixed so error out immediately + raise _err + + async def backfill_bars( sym: str, shm: ShmArray, # type: ignore # noqa # count: int = 20, # NOTE: any more and we'll overrun underlying buffer - count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + count: int = 6, # NOTE: any more and we'll overrun the underlying buffer task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: """Fill historical bars into shared mem / storage afap. @@ -800,10 +863,7 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 """ - first_bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - ) + first_bars, bars_array, next_dt = await get_bars(sym) # write historical data to buffer shm.push(bars_array) @@ -812,46 +872,12 @@ async def backfill_bars( task_status.started(cs) - next_dt = first_bars[0].date - i = 0 while i < count: - try: - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - end_dt=next_dt, - ) - - if bars_array is None: - raise SymbolNotFound(sym) - - shm.push(bars_array, prepend=True) - i += 1 - next_dt = bars[0].date - - except RequestError as err: - # TODO: retreive underlying ``ib_insync`` error? - - if err.code == 162: - - if 'HMDS query returned no data' in err.message: - # means we hit some kind of historical "dead zone" - # and further requests seem to always cause - # throttling despite the rps being low - break - - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f`" - "in TWS" - ) - - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - await tractor.breakpoint() + bars, bars_array, next_dt = await get_bars(sym, end_dt=next_dt) + shm.push(bars_array, prepend=True) + i += 1 asset_type_map = { @@ -1201,28 +1227,59 @@ async def stream_trades( @tractor.context async def open_symbol_search( ctx: tractor.Context, -) -> Client: - async with open_cached_client('ib') as client: +) -> None: + # async with open_cached_client('ib') as client: - # load all symbols locally for fast search - await ctx.started({}) + # load all symbols locally for fast search + await ctx.started({}) - async with ctx.open_stream() as stream: + async with ctx.open_stream() as stream: - async for pattern in stream: + last = time.time() - if not pattern: - # will get error on empty request - continue + async for pattern in stream: + log.debug(f'received {pattern}') + now = time.time() - results = await client.search_stocks(pattern=pattern, upto=5) + assert pattern, 'IB can not accept blank search pattern' - matches = fuzzy.extractBests( - pattern, - results, - score_cutoff=50, - ) - await stream.send( - {item[2]: item[0] - for item in matches} - ) + # throttle search requests to no faster then 1Hz + diff = now - last + if diff < 1.0: + log.debug('throttle sleeping') + await trio.sleep(diff) + try: + pattern = stream.receive_nowait() + # if new: + # pattern = new + except trio.WouldBlock: + pass + + log.debug(f'searching for {pattern}') + # await tractor.breakpoint() + last = time.time() + results = await _trio_run_client_method( + method='search_stocks', + pattern=pattern, + upto=5, + ) + log.debug(f'got results {results.keys()}') + # results = await client.search_stocks( + # pattern=pattern, upto=5) + + # if cs.cancelled_caught: + # print(f'timed out search for {pattern} !?') + # # await tractor.breakpoint() + # await stream.send({}) + # continue + + log.debug("fuzzy matching") + matches = fuzzy.extractBests( + pattern, + results, + score_cutoff=50, + ) + + matches = {item[2]: item[0] for item in matches} + log.debug(f"sending matches: {matches.keys()}") + await stream.send(matches)