diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index c98057a7..e0ad96c8 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -31,8 +31,6 @@ from .api import ( from .feed import ( open_history_client, stream_quotes, - get_mkt_info, - open_symbol_search, ) from .broker import ( open_trade_dialog, @@ -41,11 +39,11 @@ from .ledger import ( norm_trade, norm_trade_records, ) -# TODO: -# from .symbols import ( -# get_mkt_info, -# open_symbol_search, -# ) +from .symbols import ( + get_mkt_info, + open_symbol_search, + _search_conf, +) __all__ = [ 'get_client', @@ -56,6 +54,7 @@ __all__ = [ 'open_history_client', 'open_symbol_search', 'stream_quotes', + '_search_conf', ] _brokerd_mods: list[str] = [ @@ -65,6 +64,7 @@ _brokerd_mods: list[str] = [ _datad_mods: list[str] = [ 'feed', + 'symbols', ] diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 880e9f53..69e5cc3e 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -31,7 +31,6 @@ from dataclasses import ( from datetime import datetime from functools import ( partial, - # lru_cache, ) import itertools from math import isnan @@ -47,7 +46,6 @@ import inspect import time from types import SimpleNamespace - from bidict import bidict import trio import tractor @@ -67,7 +65,6 @@ from ib_insync import ( ) from ib_insync.contract import ( ContractDetails, - Option, ) from ib_insync.order import Order from ib_insync.ticker import Ticker @@ -88,6 +85,13 @@ import numpy as np # non-relative for backends so that non-builting backends # can be easily modelled after this style B) from piker import config +from .symbols import ( + con2fqme, + parse_patt2fqme, + _adhoc_symbol_map, + _exch_skip_list, + _futes_venues, +) from ._util import ( log, # only for the ib_sync internal logging @@ -133,15 +137,6 @@ _bar_sizes = { _show_wap_in_history: bool = False -# optional search config the backend can register for -# it's symbol search handling (in this case we avoid -# accepting patterns before the kb has settled more then -# a quarter second). -_search_conf = { - 'pause_period': 6 / 16, -} - - # overrides to sidestep pretty questionable design decisions in # ``ib_insync``: class NonShittyWrapper(Wrapper): @@ -200,120 +195,6 @@ class NonShittyIB(IB): # self.errorEvent += self._onError self.client.apiEnd += self.disconnectedEvent - -_futes_venues = ( - 'GLOBEX', - 'NYMEX', - 'CME', - 'CMECRYPTO', - 'COMEX', - # 'CMDTY', # special name case.. - 'CBOT', # (treasury) yield futures -) - -_adhoc_cmdty_set = { - # metals - # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 - 'xauusd.cmdty', # london gold spot ^ - 'xagusd.cmdty', # silver spot -} - -# NOTE: if you aren't seeing one of these symbol's futues contracts -# show up, it's likely the `.` part is wrong! -_adhoc_futes_set = { - - # equities - 'nq.cme', - 'mnq.cme', # micro - - 'es.cme', - 'mes.cme', # micro - - # cypto$ - 'brr.cme', - 'mbt.cme', # micro - 'ethusdrr.cme', - - # agriculture - 'he.comex', # lean hogs - 'le.comex', # live cattle (geezers) - 'gf.comex', # feeder cattle (younguns) - - # raw - 'lb.comex', # random len lumber - - 'gc.comex', - 'mgc.comex', # micro - - # oil & gas - 'cl.nymex', - - 'ni.comex', # silver futes - 'qi.comex', # mini-silver futes - - # treasury yields - # etfs by duration: - # SHY -> IEI -> IEF -> TLT - 'zt.cbot', # 2y - 'z3n.cbot', # 3y - 'zf.cbot', # 5y - 'zn.cbot', # 10y - 'zb.cbot', # 30y - - # (micros of above) - '2yy.cbot', - '5yy.cbot', - '10y.cbot', - '30y.cbot', -} - - -# taken from list here: -# https://www.interactivebrokers.com/en/trading/products-spot-currencies.php -_adhoc_fiat_set = set(( - 'USD, AED, AUD, CAD,' - 'CHF, CNH, CZK, DKK,' - 'EUR, GBP, HKD, HUF,' - 'ILS, JPY, MXN, NOK,' - 'NZD, PLN, RUB, SAR,' - 'SEK, SGD, TRY, ZAR' - ).split(' ,') -) - - -# map of symbols to contract ids -_adhoc_symbol_map = { - # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 - - # NOTE: some cmdtys/metals don't have trade data like gold/usd: - # https://groups.io/g/twsapi/message/44174 - 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), -} -for qsn in _adhoc_futes_set: - sym, venue = qsn.split('.') - assert venue.upper() in _futes_venues, f'{venue}' - _adhoc_symbol_map[sym.upper()] = ( - {'exchange': venue}, - {}, - ) - - -# exchanges we don't support at the moment due to not knowing -# how to do symbol-contract lookup correctly likely due -# to not having the data feeds subscribed. -_exch_skip_list = { - - 'ASX', # aussie stocks - 'MEXI', # mexican stocks - - # no idea - 'VALUE', - 'FUNDSERV', - 'SWB2', - 'PSE', - 'PHLX', -} - _enters = 0 @@ -397,14 +278,13 @@ class Client: # as needed throughout this backend (eg. vnc sockaddr). self.conf = config + # NOTE: the ib.client here is "throttled" to 45 rps by default self.ib = ib - self.ib.RaiseRequestErrors = True + self.ib.RaiseRequestErrors: bool = True # contract cache self._cons: dict[str, Contract] = {} - # NOTE: the ib.client here is "throttled" to 45 rps by default - async def trades(self) -> list[dict]: ''' Return list of trade-fills from current session in ``dict``. @@ -544,14 +424,14 @@ class Client: ) -> dict[str, ContractDetails]: - futs = [] + futs: list[asyncio.Future] = [] for con in contracts: if con.primaryExchange not in _exch_skip_list: futs.append(self.ib.reqContractDetailsAsync(con)) # batch request all details try: - results = await asyncio.gather(*futs) + results: list[ContractDetails] = await asyncio.gather(*futs) except RequestError as err: msg = err.message if ( @@ -561,7 +441,7 @@ class Client: return {} # one set per future result - details = {} + details: dict[str, ContractDetails] = {} for details_set in results: # XXX: if there is more then one entry in the details list @@ -576,26 +456,28 @@ class Client: return details - async def search_stocks( + async def search_contracts( self, pattern: str, upto: int = 3, # how many contracts to search "up to" ) -> dict[str, ContractDetails]: ''' - Search for stocks matching provided ``str`` pattern. + Search for ``Contract``s matching provided ``str`` pattern. - Return a dictionary of ``upto`` entries worth of contract details. + Return a dictionary of ``upto`` entries worth of ``ContractDetails``. ''' - descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) - - if descriptions is None: + descrs: list[ContractDetails] = ( + await self.ib.reqMatchingSymbolsAsync(pattern) + ) + if descrs is None: return {} - # limit - descrs = descriptions[:upto] - return await self.con_deats([d.contract for d in descrs]) + return await self.con_deats( + # limit to first ``upto`` entries + [d.contract for d in descrs[:upto]] + ) async def search_symbols( self, @@ -609,7 +491,7 @@ class Client: # TODO add search though our adhoc-locally defined symbol set # for futes/cmdtys/ try: - results = await self.search_stocks( + results = await self.search_contracts( pattern, upto=upto, ) @@ -712,8 +594,8 @@ class Client: return con - # TODO: make this work with our `MethodProxy`.. - # @lru_cache(maxsize=None) + # TODO: is this a better approach? + # @async_lifo_cache() async def get_con( self, conid: int, @@ -727,61 +609,6 @@ class Client: self._cons[conid] = con return con - def parse_patt2fqme( - self, - pattern: str, - - ) -> tuple[str, str, str, str]: - - # TODO: we can't use this currently because - # ``wrapper.starTicker()`` currently cashes ticker instances - # which means getting a singel quote will potentially look up - # a quote for a ticker that it already streaming and thus run - # into state clobbering (eg. list: Ticker.ticks). It probably - # makes sense to try this once we get the pub-sub working on - # individual symbols... - - # XXX UPDATE: we can probably do the tick/trades scraping - # inside our eventkit handler instead to bypass this entirely? - - currency = '' - - # fqme parsing stage - # ------------------ - if '.ib' in pattern: - from piker.accounting import unpack_fqme - _, symbol, venue, expiry = unpack_fqme(pattern) - - else: - symbol = pattern - expiry = '' - - # another hack for forex pairs lul. - if ( - '.idealpro' in symbol - # or '/' in symbol - ): - exch = 'IDEALPRO' - symbol = symbol.removesuffix('.idealpro') - if '/' in symbol: - symbol, currency = symbol.split('/') - - else: - # TODO: yes, a cache.. - # try: - # # give the cache a go - # return self._contracts[symbol] - # except KeyError: - # log.debug(f'Looking up contract for {symbol}') - expiry: str = '' - if symbol.count('.') > 1: - symbol, _, expiry = symbol.rpartition('.') - - # use heuristics to figure out contract "type" - symbol, exch = symbol.upper().rsplit('.', maxsplit=1) - - return symbol, currency, exch, expiry - async def find_contracts( self, pattern: Optional[str] = None, @@ -792,7 +619,7 @@ class Client: ) -> Contract: if pattern is not None: - symbol, currency, exch, expiry = self.parse_patt2fqme( + symbol, currency, exch, expiry = parse_patt2fqme( pattern, ) sectype = '' @@ -1145,80 +972,6 @@ class Client: return self.ib.positions(account=account) -def con2fqme( - con: Contract, - _cache: dict[int, (str, bool)] = {} - -) -> tuple[str, bool]: - ''' - Convert contracts to fqme-style strings to be used both in symbol-search - matching and as feed tokens passed to the front end data deed layer. - - Previously seen contracts are cached by id. - - ''' - # should be real volume for this contract by default - calc_price = False - if con.conId: - try: - return _cache[con.conId] - except KeyError: - pass - - suffix = con.primaryExchange or con.exchange - symbol = con.symbol - expiry = con.lastTradeDateOrContractMonth or '' - - match con: - case Option(): - # TODO: option symbol parsing and sane display: - symbol = con.localSymbol.replace(' ', '') - - case ( - Commodity() - # search API endpoint returns std con box.. - | Contract(secType='CMDTY') - ): - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = con.secType - - # no real volume on this tract - calc_price = True - - case Forex() | Contract(secType='CASH'): - dst, src = con.localSymbol.split('.') - symbol = ''.join([dst, src]) - suffix = con.exchange or 'idealpro' - - # no real volume on forex feeds.. - calc_price = True - - if not suffix: - entry = _adhoc_symbol_map.get( - con.symbol or con.localSymbol - ) - if entry: - meta, kwargs = entry - cid = meta.get('conId') - if cid: - assert con.conId == meta['conId'] - suffix = meta['exchange'] - - # append a `.` to the returned symbol - # key for derivatives that normally is the expiry - # date key. - if expiry: - suffix += f'.{expiry}' - - fqme_key = symbol.lower() - if suffix: - fqme_key = '.'.join((fqme_key, suffix)).lower() - - _cache[con.conId] = fqme_key, calc_price - return fqme_key, calc_price - - # per-actor API ep caching _client_cache: dict[tuple[str, int], Client] = {} _scan_ignore: set[tuple[str, int]] = set() diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 7673c2c5..8a6ac949 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -21,9 +21,7 @@ from __future__ import annotations import asyncio from contextlib import ( asynccontextmanager as acm, - nullcontext, ) -from decimal import Decimal from dataclasses import asdict from datetime import datetime from functools import partial @@ -32,11 +30,9 @@ import time from typing import ( Any, Callable, - Awaitable, ) from async_generator import aclosing -from fuzzywuzzy import process as fuzzy import ib_insync as ibis import numpy as np import pendulum @@ -44,6 +40,10 @@ import tractor import trio from trio_typing import TaskStatus +from piker.accounting import ( + MktPair, +) +from piker.data.validate import FeedInit from .._util import ( NoData, DataUnavailable, @@ -63,14 +63,7 @@ from .api import ( RequestError, ) from ._util import data_reset_hack -from piker._cacheables import ( - async_lifo_cache, -) -from piker.accounting import ( - Asset, - MktPair, -) -from piker.data.validate import FeedInit +from .symbols import get_mkt_info # XXX NOTE: See available types table docs: @@ -559,28 +552,6 @@ async def get_bars( return result, data_cs is not None -# re-mapping to piker asset type names -# https://github.com/erdewit/ib_insync/blob/master/ib_insync/contract.py#L113 -_asset_type_map = { - 'STK': 'stock', - 'OPT': 'option', - 'FUT': 'future', - 'CONTFUT': 'continuous_future', - 'CASH': 'fiat', - 'IND': 'index', - 'CFD': 'cfd', - 'BOND': 'bond', - 'CMDTY': 'commodity', - 'FOP': 'futures_option', - 'FUND': 'mutual_fund', - 'WAR': 'warrant', - 'IOPT': 'warran', - 'BAG': 'bag', - 'CRYPTO': 'crypto', # bc it's diff then fiat? - # 'NEWS': 'news', -} - - _quote_streams: dict[str, trio.abc.ReceiveStream] = {} @@ -784,97 +755,6 @@ def normalize( return data -@async_lifo_cache() -async def get_mkt_info( - fqme: str, - - proxy: MethodProxy | None = None, - -) -> tuple[MktPair, ibis.ContractDetails]: - - # XXX: we don't need to split off any fqme broker part? - # bs_fqme, _, broker = fqme.partition('.') - - proxy: MethodProxy - if proxy is not None: - client_ctx = nullcontext(proxy) - else: - client_ctx = open_data_client - - async with client_ctx as proxy: - try: - ( - con, # Contract - details, # ContractDetails - ) = await proxy.get_sym_details(symbol=fqme) - except ConnectionError: - log.exception(f'Proxy is ded {proxy._aio_ns}') - raise - - # TODO: more consistent field translation - atype = _asset_type_map[con.secType] - - if atype == 'commodity': - venue: str = 'cmdty' - else: - venue = con.primaryExchange or con.exchange - - price_tick: Decimal = Decimal(str(details.minTick)) - - if atype == 'stock': - # XXX: GRRRR they don't support fractional share sizes for - # stocks from the API?! - # if con.secType == 'STK': - size_tick = Decimal('1') - else: - size_tick: Decimal = Decimal( - str(details.minSize).rstrip('0') - ) - # |-> TODO: there is also the Contract.sizeIncrement, bt wtf is it? - - # NOTE: this is duplicate from the .broker.norm_trade_records() - # routine, we should factor all this parsing somewhere.. - expiry_str = str(con.lastTradeDateOrContractMonth) - # if expiry: - # expiry_str: str = str(pendulum.parse( - # str(expiry).strip(' ') - # )) - - # TODO: currently we can't pass the fiat src asset because - # then we'll get a `MNQUSD` request for history data.. - # we need to figure out how we're going to handle this (later?) - # but likely we want all backends to eventually handle - # ``dst/src.venue.`` style !? - src = Asset( - name=str(con.currency).lower(), - atype='fiat', - tx_tick=Decimal('0.01'), # right? - ) - - mkt = MktPair( - dst=Asset( - name=con.symbol.lower(), - atype=atype, - tx_tick=size_tick, - ), - src=src, - - price_tick=price_tick, - size_tick=size_tick, - - bs_mktid=str(con.conId), - venue=str(venue), - expiry=expiry_str, - broker='ib', - - # TODO: options contract info as str? - # contract_info= - _fqme_without_src=(atype != 'fiat'), - ) - - return mkt, details - - async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -1045,141 +925,3 @@ async def stream_quotes( # ugh, clear ticks since we've consumed them ticker.ticks = [] # last = time.time() - - -@tractor.context -async def open_symbol_search( - ctx: tractor.Context, - -) -> None: - - # TODO: load user defined symbol set locally for fast search? - await ctx.started({}) - - async with ( - open_client_proxies() as (proxies, _), - open_data_client() as data_proxy, - ): - async with ctx.open_stream() as stream: - - # select a non-history client for symbol search to lighten - # the load in the main data node. - proxy = data_proxy - for name, proxy in proxies.items(): - if proxy is data_proxy: - continue - break - - ib_client = proxy._aio_ns.ib - log.info(f'Using {ib_client} for symbol search') - - last = time.time() - async for pattern in stream: - log.info(f'received {pattern}') - now = time.time() - - # this causes tractor hang... - # assert 0 - - assert pattern, 'IB can not accept blank search pattern' - - # throttle search requests to no faster then 1Hz - diff = now - last - if diff < 1.0: - log.debug('throttle sleeping') - await trio.sleep(diff) - try: - pattern = stream.receive_nowait() - except trio.WouldBlock: - pass - - if ( - not pattern - or pattern.isspace() - - # XXX: not sure if this is a bad assumption but it - # seems to make search snappier? - or len(pattern) < 1 - ): - log.warning('empty pattern received, skipping..') - - # TODO: *BUG* if nothing is returned here the client - # side will cache a null set result and not showing - # anything to the use on re-searches when this query - # timed out. We probably need a special "timeout" msg - # or something... - - # XXX: this unblocks the far end search task which may - # hold up a multi-search nursery block - await stream.send({}) - - continue - - log.info(f'searching for {pattern}') - - last = time.time() - - # async batch search using api stocks endpoint and module - # defined adhoc symbol set. - stock_results = [] - - async def stash_results(target: Awaitable[list]): - try: - results = await target - except tractor.trionics.Lagged: - print("IB SYM-SEARCH OVERRUN?!?") - return - - stock_results.extend(results) - - for i in range(10): - with trio.move_on_after(3) as cs: - async with trio.open_nursery() as sn: - sn.start_soon( - stash_results, - proxy.search_symbols( - pattern=pattern, - upto=5, - ), - ) - - # trigger async request - await trio.sleep(0) - - if cs.cancelled_caught: - log.warning( - f'Search timeout? {proxy._aio_ns.ib.client}' - ) - continue - else: - break - - # # match against our ad-hoc set immediately - # adhoc_matches = fuzzy.extractBests( - # pattern, - # list(_adhoc_futes_set), - # score_cutoff=90, - # ) - # log.info(f'fuzzy matched adhocs: {adhoc_matches}') - # adhoc_match_results = {} - # if adhoc_matches: - # # TODO: do we need to pull contract details? - # adhoc_match_results = {i[0]: {} for i in - # adhoc_matches} - - log.debug(f'fuzzy matching stocks {stock_results}') - stock_matches = fuzzy.extractBests( - pattern, - stock_results, - score_cutoff=50, - ) - - # matches = adhoc_match_results | { - matches = { - item[0]: {} for item in stock_matches - } - # TODO: we used to deliver contract details - # {item[2]: item[0] for item in stock_matches} - - log.debug(f"sending matches: {matches.keys()}") - await stream.send(matches) diff --git a/piker/brokers/ib/symbols.py b/piker/brokers/ib/symbols.py new file mode 100644 index 00000000..9a749179 --- /dev/null +++ b/piker/brokers/ib/symbols.py @@ -0,0 +1,561 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Symbology search and normalization. + +''' +from __future__ import annotations +from contextlib import ( + nullcontext, +) +from decimal import Decimal +import time +from typing import ( + Awaitable, + TYPE_CHECKING, +) + +from fuzzywuzzy import process as fuzzy +import ib_insync as ibis +import tractor +import trio + +from piker.accounting import ( + Asset, + MktPair, +) +from piker._cacheables import ( + async_lifo_cache, +) + +from ._util import ( + log, +) + +if TYPE_CHECKING: + from .api import ( + MethodProxy, + ) + +_futes_venues = ( + 'GLOBEX', + 'NYMEX', + 'CME', + 'CMECRYPTO', + 'COMEX', + # 'CMDTY', # special name case.. + 'CBOT', # (treasury) yield futures +) + +_adhoc_cmdty_set = { + # metals + # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 + 'xauusd.cmdty', # london gold spot ^ + 'xagusd.cmdty', # silver spot +} + +# NOTE: if you aren't seeing one of these symbol's futues contracts +# show up, it's likely the `.` part is wrong! +_adhoc_futes_set = { + + # equities + 'nq.cme', + 'mnq.cme', # micro + + 'es.cme', + 'mes.cme', # micro + + # cypto$ + 'brr.cme', + 'mbt.cme', # micro + 'ethusdrr.cme', + + # agriculture + 'he.comex', # lean hogs + 'le.comex', # live cattle (geezers) + 'gf.comex', # feeder cattle (younguns) + + # raw + 'lb.comex', # random len lumber + + 'gc.comex', + 'mgc.comex', # micro + + # oil & gas + 'cl.nymex', + + 'ni.comex', # silver futes + 'qi.comex', # mini-silver futes + + # treasury yields + # etfs by duration: + # SHY -> IEI -> IEF -> TLT + 'zt.cbot', # 2y + 'z3n.cbot', # 3y + 'zf.cbot', # 5y + 'zn.cbot', # 10y + 'zb.cbot', # 30y + + # (micros of above) + '2yy.cbot', + '5yy.cbot', + '10y.cbot', + '30y.cbot', +} + + +# taken from list here: +# https://www.interactivebrokers.com/en/trading/products-spot-currencies.php +_adhoc_fiat_set = set(( + 'USD, AED, AUD, CAD,' + 'CHF, CNH, CZK, DKK,' + 'EUR, GBP, HKD, HUF,' + 'ILS, JPY, MXN, NOK,' + 'NZD, PLN, RUB, SAR,' + 'SEK, SGD, TRY, ZAR' + ).split(' ,') +) + + +# map of symbols to contract ids +_adhoc_symbol_map = { + # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 + + # NOTE: some cmdtys/metals don't have trade data like gold/usd: + # https://groups.io/g/twsapi/message/44174 + 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), +} +for qsn in _adhoc_futes_set: + sym, venue = qsn.split('.') + assert venue.upper() in _futes_venues, f'{venue}' + _adhoc_symbol_map[sym.upper()] = ( + {'exchange': venue}, + {}, + ) + + +# exchanges we don't support at the moment due to not knowing +# how to do symbol-contract lookup correctly likely due +# to not having the data feeds subscribed. +_exch_skip_list = { + + 'ASX', # aussie stocks + 'MEXI', # mexican stocks + + # no idea + 'VALUE', + 'FUNDSERV', + 'SWB2', + 'PSE', + 'PHLX', +} + +# optional search config the backend can register for +# it's symbol search handling (in this case we avoid +# accepting patterns before the kb has settled more then +# a quarter second). +_search_conf = { + 'pause_period': 6 / 16, +} + + +@tractor.context +async def open_symbol_search(ctx: tractor.Context) -> None: + ''' + Symbology search brokerd-endpoint. + + ''' + from .api import open_client_proxies + from .feed import open_data_client + + # TODO: load user defined symbol set locally for fast search? + await ctx.started({}) + + async with ( + open_client_proxies() as (proxies, _), + open_data_client() as data_proxy, + ): + async with ctx.open_stream() as stream: + + # select a non-history client for symbol search to lighten + # the load in the main data node. + proxy = data_proxy + for name, proxy in proxies.items(): + if proxy is data_proxy: + continue + break + + ib_client = proxy._aio_ns.ib + log.info(f'Using {ib_client} for symbol search') + + last = time.time() + async for pattern in stream: + log.info(f'received {pattern}') + now = time.time() + + # this causes tractor hang... + # assert 0 + + assert pattern, 'IB can not accept blank search pattern' + + # throttle search requests to no faster then 1Hz + diff = now - last + if diff < 1.0: + log.debug('throttle sleeping') + await trio.sleep(diff) + try: + pattern = stream.receive_nowait() + except trio.WouldBlock: + pass + + if ( + not pattern + or pattern.isspace() + + # XXX: not sure if this is a bad assumption but it + # seems to make search snappier? + or len(pattern) < 1 + ): + log.warning('empty pattern received, skipping..') + + # TODO: *BUG* if nothing is returned here the client + # side will cache a null set result and not showing + # anything to the use on re-searches when this query + # timed out. We probably need a special "timeout" msg + # or something... + + # XXX: this unblocks the far end search task which may + # hold up a multi-search nursery block + await stream.send({}) + + continue + + log.info(f'searching for {pattern}') + + last = time.time() + + # async batch search using api stocks endpoint and module + # defined adhoc symbol set. + stock_results = [] + + async def stash_results(target: Awaitable[list]): + try: + results = await target + except tractor.trionics.Lagged: + print("IB SYM-SEARCH OVERRUN?!?") + return + + stock_results.extend(results) + + for i in range(10): + with trio.move_on_after(3) as cs: + async with trio.open_nursery() as sn: + sn.start_soon( + stash_results, + proxy.search_symbols( + pattern=pattern, + upto=5, + ), + ) + + # trigger async request + await trio.sleep(0) + + if cs.cancelled_caught: + log.warning( + f'Search timeout? {proxy._aio_ns.ib.client}' + ) + continue + else: + break + + # # match against our ad-hoc set immediately + # adhoc_matches = fuzzy.extractBests( + # pattern, + # list(_adhoc_futes_set), + # score_cutoff=90, + # ) + # log.info(f'fuzzy matched adhocs: {adhoc_matches}') + # adhoc_match_results = {} + # if adhoc_matches: + # # TODO: do we need to pull contract details? + # adhoc_match_results = {i[0]: {} for i in + # adhoc_matches} + + log.debug(f'fuzzy matching stocks {stock_results}') + stock_matches = fuzzy.extractBests( + pattern, + stock_results, + score_cutoff=50, + ) + + # matches = adhoc_match_results | { + matches = { + item[0]: {} for item in stock_matches + } + # TODO: we used to deliver contract details + # {item[2]: item[0] for item in stock_matches} + + log.debug(f"sending matches: {matches.keys()}") + await stream.send(matches) + + +# re-mapping to piker asset type names +# https://github.com/erdewit/ib_insync/blob/master/ib_insync/contract.py#L113 +_asset_type_map = { + 'STK': 'stock', + 'OPT': 'option', + 'FUT': 'future', + 'CONTFUT': 'continuous_future', + 'CASH': 'fiat', + 'IND': 'index', + 'CFD': 'cfd', + 'BOND': 'bond', + 'CMDTY': 'commodity', + 'FOP': 'futures_option', + 'FUND': 'mutual_fund', + 'WAR': 'warrant', + 'IOPT': 'warran', + 'BAG': 'bag', + 'CRYPTO': 'crypto', # bc it's diff then fiat? + # 'NEWS': 'news', +} + + +def parse_patt2fqme( + # client: Client, + pattern: str, + +) -> tuple[str, str, str, str]: + + # TODO: we can't use this currently because + # ``wrapper.starTicker()`` currently cashes ticker instances + # which means getting a singel quote will potentially look up + # a quote for a ticker that it already streaming and thus run + # into state clobbering (eg. list: Ticker.ticks). It probably + # makes sense to try this once we get the pub-sub working on + # individual symbols... + + # XXX UPDATE: we can probably do the tick/trades scraping + # inside our eventkit handler instead to bypass this entirely? + + currency = '' + + # fqme parsing stage + # ------------------ + if '.ib' in pattern: + from piker.accounting import unpack_fqme + _, symbol, venue, expiry = unpack_fqme(pattern) + + else: + symbol = pattern + expiry = '' + + # another hack for forex pairs lul. + if ( + '.idealpro' in symbol + # or '/' in symbol + ): + exch = 'IDEALPRO' + symbol = symbol.removesuffix('.idealpro') + if '/' in symbol: + symbol, currency = symbol.split('/') + + else: + # TODO: yes, a cache.. + # try: + # # give the cache a go + # return client._contracts[symbol] + # except KeyError: + # log.debug(f'Looking up contract for {symbol}') + expiry: str = '' + if symbol.count('.') > 1: + symbol, _, expiry = symbol.rpartition('.') + + # use heuristics to figure out contract "type" + symbol, exch = symbol.upper().rsplit('.', maxsplit=1) + + return symbol, currency, exch, expiry + + +def con2fqme( + con: ibis.Contract, + _cache: dict[int, (str, bool)] = {} + +) -> tuple[str, bool]: + ''' + Convert contracts to fqme-style strings to be used both in + symbol-search matching and as feed tokens passed to the front + end data deed layer. + + Previously seen contracts are cached by id. + + ''' + # should be real volume for this contract by default + calc_price = False + if con.conId: + try: + return _cache[con.conId] + except KeyError: + pass + + suffix = con.primaryExchange or con.exchange + symbol = con.symbol + expiry = con.lastTradeDateOrContractMonth or '' + + match con: + case ibis.Option(): + # TODO: option symbol parsing and sane display: + symbol = con.localSymbol.replace(' ', '') + + case ( + ibis.Commodity() + # search API endpoint returns std con box.. + | ibis.Contract(secType='CMDTY') + ): + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = con.secType + + # no real volume on this tract + calc_price = True + + case ibis.Forex() | ibis.Contract(secType='CASH'): + dst, src = con.localSymbol.split('.') + symbol = ''.join([dst, src]) + suffix = con.exchange or 'idealpro' + + # no real volume on forex feeds.. + calc_price = True + + if not suffix: + entry = _adhoc_symbol_map.get( + con.symbol or con.localSymbol + ) + if entry: + meta, kwargs = entry + cid = meta.get('conId') + if cid: + assert con.conId == meta['conId'] + suffix = meta['exchange'] + + # append a `.` to the returned symbol + # key for derivatives that normally is the expiry + # date key. + if expiry: + suffix += f'.{expiry}' + + fqme_key = symbol.lower() + if suffix: + fqme_key = '.'.join((fqme_key, suffix)).lower() + + _cache[con.conId] = fqme_key, calc_price + return fqme_key, calc_price + + +@async_lifo_cache() +async def get_mkt_info( + fqme: str, + + proxy: MethodProxy | None = None, + +) -> tuple[MktPair, ibis.ContractDetails]: + + # XXX: we don't need to split off any fqme broker part? + # bs_fqme, _, broker = fqme.partition('.') + + proxy: MethodProxy + if proxy is not None: + client_ctx = nullcontext(proxy) + else: + from .feed import ( + open_data_client, + ) + client_ctx = open_data_client + + async with client_ctx as proxy: + try: + ( + con, # Contract + details, # ContractDetails + ) = await proxy.get_sym_details(symbol=fqme) + except ConnectionError: + log.exception(f'Proxy is ded {proxy._aio_ns}') + raise + + # TODO: more consistent field translation + atype = _asset_type_map[con.secType] + + if atype == 'commodity': + venue: str = 'cmdty' + else: + venue = con.primaryExchange or con.exchange + + price_tick: Decimal = Decimal(str(details.minTick)) + + if atype == 'stock': + # XXX: GRRRR they don't support fractional share sizes for + # stocks from the API?! + # if con.secType == 'STK': + size_tick = Decimal('1') + else: + size_tick: Decimal = Decimal( + str(details.minSize).rstrip('0') + ) + # |-> TODO: there is also the Contract.sizeIncrement, bt wtf is it? + + # NOTE: this is duplicate from the .broker.norm_trade_records() + # routine, we should factor all this parsing somewhere.. + expiry_str = str(con.lastTradeDateOrContractMonth) + # if expiry: + # expiry_str: str = str(pendulum.parse( + # str(expiry).strip(' ') + # )) + + # TODO: currently we can't pass the fiat src asset because + # then we'll get a `MNQUSD` request for history data.. + # we need to figure out how we're going to handle this (later?) + # but likely we want all backends to eventually handle + # ``dst/src.venue.`` style !? + src = Asset( + name=str(con.currency).lower(), + atype='fiat', + tx_tick=Decimal('0.01'), # right? + ) + + mkt = MktPair( + dst=Asset( + name=con.symbol.lower(), + atype=atype, + tx_tick=size_tick, + ), + src=src, + + price_tick=price_tick, + size_tick=size_tick, + + bs_mktid=str(con.conId), + venue=str(venue), + expiry=expiry_str, + broker='ib', + + # TODO: options contract info as str? + # contract_info= + _fqme_without_src=(atype != 'fiat'), + ) + + return mkt, details