diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index f081bd22..89a8fe11 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -25,7 +25,10 @@ from contextlib import asynccontextmanager from dataclasses import asdict from datetime import datetime from functools import partial -from typing import List, Dict, Any, Tuple, Optional, AsyncIterator +from typing import ( + List, Dict, Any, Tuple, Optional, + AsyncIterator, Awaitable, +) import asyncio from pprint import pformat import inspect @@ -292,12 +295,47 @@ class Client: df = ibis.util.df(bars) return bars, from_df(df) + async def con_deats( + self, + contracts: list[Contract], + + ) -> dict[str, ContractDetails]: + + futs = [] + for con in contracts: + if con.primaryExchange not in _exch_skip_list: + futs.append(self.ib.reqContractDetailsAsync(con)) + + # batch request all details + results = await asyncio.gather(*futs) + + # XXX: if there is more then one entry in the details list + details = {} + for details_set in results: + + # then the contract is so called "ambiguous". + for d in details_set: + con = d.contract + unique_sym = f'{con.symbol}.{con.primaryExchange}' + + as_dict = asdict(d) + # nested dataclass we probably don't need and that + # won't IPC serialize + as_dict.pop('secIdList') + + details[unique_sym] = as_dict + + return details + async def search_stocks( self, pattern: str, + + get_details: bool = False, # how many contracts to search "up to" upto: int = 3, - ) -> Dict[str, ContractDetails]: + + ) -> dict[str, ContractDetails]: """Search for stocks matching provided ``str`` pattern. Return a dictionary of ``upto`` entries worth of contract details. @@ -305,37 +343,22 @@ class Client: descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) if descriptions is not None: + descrs = descriptions[:upto] - futs = [] - for d in descriptions: - con = d.contract - if con.primaryExchange not in _exch_skip_list: - futs.append(self.ib.reqContractDetailsAsync(con)) + if get_details: + return await self.con_deats([d.contract for d in descrs]) - # batch request all details - results = await asyncio.gather(*futs) - - # XXX: if there is more then one entry in the details list - details = {} - for details_set in results: - - # then the contract is so called "ambiguous". - for d in details_set: + else: + results = {} + for d in descrs: con = d.contract - unique_sym = f'{con.symbol}.{con.primaryExchange}' - - as_dict = asdict(d) - # nested dataclass we probably don't need and that - # won't IPC serialize - as_dict.pop('secIdList') - - details[unique_sym] = as_dict - - if len(details) == upto: - return details - - return details + # sometimes there's a weird extra suffix returned + # from search? + exch = con.primaryExchange.rsplit('.')[0] + unique_sym = f'{con.symbol}.{exch}' + results[unique_sym] = {} + return results else: return {} @@ -345,20 +368,12 @@ class Client: # how many contracts to search "up to" upto: int = 3, asdicts: bool = True, + ) -> Dict[str, ContractDetails]: # TODO add search though our adhoc-locally defined symbol set # for futes/cmdtys/ - return await self.search_stocks(pattern, upto, asdicts) - - async def search_futes( - self, - pattern: str, - # how many contracts to search "up to" - upto: int = 3, - asdicts: bool = True, - ) -> Dict[str, ContractDetails]: - raise NotImplementedError + return await self.search_stocks(pattern, upto, get_details=True) async def get_cont_fute( self, @@ -1564,20 +1579,51 @@ async def open_symbol_search( log.debug(f'searching for {pattern}') last = time.time() - results = await _trio_run_client_method( - method='search_stocks', - pattern=pattern, - upto=5, - ) - log.debug(f'got results {results.keys()}') - log.debug("fuzzy matching") - matches = fuzzy.extractBests( + # async batch search using api stocks endpoint and module + # defined adhoc symbol set. + stock_results = [] + + async def stash_results(target: Awaitable[list]): + stock_results.extend(await target) + + async with trio.open_nursery() as sn: + sn.start_soon( + stash_results, + _trio_run_client_method( + method='search_stocks', + pattern=pattern, + upto=5, + ) + ) + + # trigger async request + await trio.sleep(0) + + # match against our ad-hoc set immediately + adhoc_matches = fuzzy.extractBests( + pattern, + list(_adhoc_futes_set), + score_cutoff=90, + ) + log.info(f'fuzzy matched adhocs: {adhoc_matches}') + adhoc_match_results = {} + if adhoc_matches: + # TODO: do we need to pull contract details? + adhoc_match_results = {i[0]: {} for i in adhoc_matches} + + log.debug(f'fuzzy matching stocks {stock_results}') + stock_matches = fuzzy.extractBests( pattern, - results, + stock_results, score_cutoff=50, ) - matches = {item[2]: item[0] for item in matches} + matches = adhoc_match_results | { + item[0]: {} for item in stock_matches + } + # TODO: we used to deliver contract details + # {item[2]: item[0] for item in stock_matches} + log.debug(f"sending matches: {matches.keys()}") await stream.send(matches)