diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 207f56f7..af99e9ad 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -29,6 +29,7 @@ import itertools from math import isnan from typing import ( Any, + Optional, Union, ) import asyncio @@ -43,8 +44,11 @@ import trio import tractor from tractor import to_asyncio import ib_insync as ibis -from ib_insync.wrapper import RequestError -from ib_insync.contract import Contract, ContractDetails +from ib_insync.contract import ( + Contract, + ContractDetails, + Option, +) from ib_insync.order import Order from ib_insync.ticker import Ticker from ib_insync.objects import ( @@ -53,7 +57,10 @@ from ib_insync.objects import ( Execution, CommissionReport, ) -from ib_insync.wrapper import Wrapper +from ib_insync.wrapper import ( + Wrapper, + RequestError, +) from ib_insync.client import Client as ib_Client import numpy as np @@ -184,12 +191,12 @@ _adhoc_futes_set = { 'ethusdrr.cmecrypto', # agriculture - 'he.globex', # lean hogs - 'le.globex', # live cattle (geezers) - 'gf.globex', # feeder cattle (younguns) + 'he.nymex', # lean hogs + 'le.nymex', # live cattle (geezers) + 'gf.nymex', # feeder cattle (younguns) # raw - 'lb.globex', # random len lumber + 'lb.nymex', # random len lumber # metals 'xauusd.cmdty', # gold spot @@ -205,6 +212,19 @@ _adhoc_futes_set = { } +# taken from list here: +# https://www.interactivebrokers.com/en/trading/products-spot-currencies.php +_adhoc_fiat_set = set(( + 'USD, AED, AUD, CAD,' + 'CHF, CNH, CZK, DKK,' + 'EUR, GBP, HKD, HUF,' + 'ILS, JPY, MXN, NOK,' + 'NZD, PLN, RUB, SAR,' + 'SEK, SGD, TRY, ZAR' + ).split(' ,') +) + + # map of symbols to contract ids _adhoc_symbol_map = { # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 @@ -234,6 +254,7 @@ _exch_skip_list = { 'VALUE', 'FUNDSERV', 'SWB2', + 'PSE', } # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 @@ -336,7 +357,7 @@ class Client: _enters += 1 - contract = await self.find_contract(fqsn) + contract = (await self.find_contracts(fqsn))[0] bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) # _min = min(2000*100, count) @@ -391,7 +412,15 @@ class Client: futs.append(self.ib.reqContractDetailsAsync(con)) # batch request all details - results = await asyncio.gather(*futs) + try: + results = await asyncio.gather(*futs) + except RequestError as err: + msg = err.message + if ( + 'No security definition' in msg + ): + log.warning(f'{msg}: {contracts}') + return {} # one set per future result details = {} @@ -400,20 +429,11 @@ class Client: # XXX: if there is more then one entry in the details list # then the contract is so called "ambiguous". for d in details_set: - con = d.contract - key = '.'.join([ - con.symbol, - con.primaryExchange or con.exchange, - ]) - expiry = con.lastTradeDateOrContractMonth - if expiry: - key += f'.{expiry}' - - # nested dataclass we probably don't need and that - # won't IPC serialize.. + # nested dataclass we probably don't need and that won't + # IPC serialize.. d.secIdList = '' - + key, calc_price = con2fqsn(d.contract) details[key] = d return details @@ -443,7 +463,7 @@ class Client: self, pattern: str, # how many contracts to search "up to" - upto: int = 3, + upto: int = 6, asdicts: bool = True, ) -> dict[str, ContractDetails]: @@ -454,7 +474,6 @@ class Client: pattern, upto=upto, ) - for key, deats in results.copy().items(): tract = deats.contract @@ -464,21 +483,44 @@ class Client: if sectype == 'IND': results[f'{sym}.IND'] = tract results.pop(key) - exch = tract.exchange + # exch = tract.exchange - if exch in _futes_venues: + # XXX: add back one of these to get the weird deadlock + # on the debugger from root without the latest + # maybe_wait_for_debugger() fix in the `open_context()` + # exit. + # assert 0 + # if con.exchange not in _exch_skip_list: + + exch = tract.exchange + if exch not in _exch_skip_list: # try get all possible contracts for symbol as per, # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut con = ibis.Future( symbol=sym, exchange=exch, ) - try: - all_deats = await self.con_deats([con]) - results |= all_deats + # TODO: make this work, think it's something to do + # with the qualify flag. + # cons = await self.find_contracts( + # contract=con, + # err_on_qualify=False, + # ) + # if cons: + all_deats = await self.con_deats([con]) + results |= all_deats - except RequestError as err: - log.warning(err.message) + # forex pairs + elif sectype == 'CASH': + dst, src = tract.localSymbol.split('.') + pair_key = "/".join([dst, src]) + exch = tract.exchange.lower() + results[f'{pair_key}.{exch}'] = tract + results.pop(key) + + # XXX: again seems to trigger the weird tractor + # bug with the debugger.. + # assert 0 return results @@ -518,13 +560,11 @@ class Client: ibis.Contract(conId=conid) ) - async def find_contract( + def parse_patt2fqsn( self, pattern: str, - currency: str = 'USD', - **kwargs, - ) -> Contract: + ) -> tuple[str, str, str, str]: # TODO: we can't use this currently because # ``wrapper.starTicker()`` currently cashes ticker instances @@ -537,58 +577,104 @@ class Client: # XXX UPDATE: we can probably do the tick/trades scraping # inside our eventkit handler instead to bypass this entirely? + currency = '' + + # fqsn parsing stage + # ------------------ if '.ib' in pattern: from ..data._source import unpack_fqsn - broker, symbol, expiry = unpack_fqsn(pattern) + _, symbol, expiry = unpack_fqsn(pattern) + else: symbol = pattern + expiry = '' - # try: - # # give the cache a go - # return self._contracts[symbol] - # except KeyError: - # log.debug(f'Looking up contract for {symbol}') - expiry: str = '' - if symbol.count('.') > 1: - symbol, _, expiry = symbol.rpartition('.') + # another hack for forex pairs lul. + if ( + '.idealpro' in symbol + # or '/' in symbol + ): + exch = 'IDEALPRO' + symbol = symbol.removesuffix('.idealpro') + if '/' in symbol: + symbol, currency = symbol.split('/') - # use heuristics to figure out contract "type" - sym, exch = symbol.upper().rsplit('.', maxsplit=1) + else: + # TODO: yes, a cache.. + # try: + # # give the cache a go + # return self._contracts[symbol] + # except KeyError: + # log.debug(f'Looking up contract for {symbol}') + expiry: str = '' + if symbol.count('.') > 1: + symbol, _, expiry = symbol.rpartition('.') - qualify: bool = True + # use heuristics to figure out contract "type" + symbol, exch = symbol.upper().rsplit('.', maxsplit=1) + + return symbol, currency, exch, expiry + + async def find_contracts( + self, + pattern: Optional[str] = None, + contract: Optional[Contract] = None, + qualify: bool = True, + err_on_qualify: bool = True, + + ) -> Contract: + + if pattern is not None: + symbol, currency, exch, expiry = self.parse_patt2fqsn( + pattern, + ) + sectype = '' + + else: + assert contract + symbol = contract.symbol + sectype = contract.secType + exch = contract.exchange or contract.primaryExchange + expiry = contract.lastTradeDateOrContractMonth + currency = contract.currency + + # contract searching stage + # ------------------------ # futes if exch in _futes_venues: if expiry: # get the "front" contract - contract = await self.get_fute( - symbol=sym, + con = await self.get_fute( + symbol=symbol, exchange=exch, expiry=expiry, ) else: # get the "front" contract - contract = await self.get_fute( - symbol=sym, + con = await self.get_fute( + symbol=symbol, exchange=exch, front=True, ) - qualify = False - - elif exch in ('FOREX'): - currency = '' - symbol, currency = sym.split('/') + elif ( + exch in ('IDEALPRO') + or sectype == 'CASH' + ): + # if '/' in symbol: + # currency = '' + # symbol, currency = symbol.split('/') con = ibis.Forex( - symbol=symbol, + pair=''.join((symbol, currency)), currency=currency, ) con.bars_kwargs = {'whatToShow': 'MIDPOINT'} # commodities elif exch == 'CMDTY': # eg. XAUUSD.CMDTY - con_kwargs, bars_kwargs = _adhoc_symbol_map[sym] + con_kwargs, bars_kwargs = _adhoc_symbol_map[symbol] con = ibis.Commodity(**con_kwargs) con.bars_kwargs = bars_kwargs @@ -604,33 +690,50 @@ class Client: exch = 'SMART' else: - exch = 'SMART' + # XXX: order is super important here since + # a primary == 'SMART' won't ever work. primaryExchange = exch + exch = 'SMART' con = ibis.Stock( - symbol=sym, + symbol=symbol, exchange=exch, primaryExchange=primaryExchange, currency=currency, ) - try: exch = 'SMART' if not exch else exch - if qualify: - contract = (await self.ib.qualifyContractsAsync(con))[0] - else: - assert contract - except IndexError: - raise ValueError(f"No contract could be found {con}") + contracts = [con] + if qualify: + try: + contracts = await self.ib.qualifyContractsAsync(con) + except RequestError as err: + msg = err.message + if ( + 'No security definition' in msg + and not err_on_qualify + ): + log.warning( + f'Could not find def for {con}') + return None - self._contracts[pattern] = contract + else: + raise + if not contracts: + raise ValueError(f"No contract could be found {con}") - # add an aditional entry with expiry suffix if available - conexp = contract.lastTradeDateOrContractMonth - if conexp: - self._contracts[pattern + f'.{conexp}'] = contract + # pack all contracts into cache + for tract in contracts: + exch: str = tract.primaryExchange or tract.exchange or exch + pattern = f'{symbol}.{exch}' + expiry = tract.lastTradeDateOrContractMonth + # add an entry with expiry suffix if available + if expiry: + pattern += f'.{expiry}' - return contract + self._contracts[pattern.lower()] = tract + + return contracts async def get_head_time( self, @@ -649,9 +752,10 @@ class Client: async def get_sym_details( self, symbol: str, + ) -> tuple[Contract, Ticker, ContractDetails]: - contract = await self.find_contract(symbol) + contract = (await self.find_contracts(symbol))[0] ticker: Ticker = self.ib.reqMktData( contract, snapshot=True, @@ -839,6 +943,73 @@ class Client: return self.ib.positions(account=account) +def con2fqsn( + con: Contract, + _cache: dict[int, (str, bool)] = {} + +) -> tuple[str, bool]: + ''' + Convert contracts to fqsn-style strings to be used both in symbol-search + matching and as feed tokens passed to the front end data deed layer. + + Previously seen contracts are cached by id. + + ''' + # should be real volume for this contract by default + calc_price = False + if con.conId: + try: + return _cache[con.conId] + except KeyError: + pass + + suffix = con.primaryExchange or con.exchange + symbol = con.symbol + expiry = con.lastTradeDateOrContractMonth or '' + + match con: + case Option(): + # TODO: option symbol parsing and sane display: + symbol = con.localSymbol.replace(' ', '') + + case ibis.Commodity(): + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = con.secType + + # no real volume on this tract + calc_price = True + + case ibis.Forex() | ibis.Contract(secType='CASH'): + dst, src = con.localSymbol.split('.') + symbol = ''.join([dst, src]) + suffix = con.exchange + + # no real volume on forex feeds.. + calc_price = True + + if not suffix: + entry = _adhoc_symbol_map.get( + con.symbol or con.localSymbol + ) + if entry: + meta, kwargs = entry + cid = meta.get('conId') + if cid: + assert con.conId == meta['conId'] + suffix = meta['exchange'] + + # append a `.` to the returned symbol + # key for derivatives that normally is the expiry + # date key. + if expiry: + suffix += f'.{expiry}' + + fqsn_key = '.'.join((symbol, suffix)).lower() + _cache[con.conId] = fqsn_key, calc_price + return fqsn_key, calc_price + + # per-actor API ep caching _client_cache: dict[tuple[str, int], Client] = {} _scan_ignore: set[tuple[str, int]] = set() diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 4737d376..bc712677 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -36,6 +36,7 @@ import tractor from ib_insync.contract import ( Contract, Option, + Forex, ) from ib_insync.order import ( Trade, @@ -88,20 +89,29 @@ def pack_position( # TODO: lookup fqsn even for derivs. symbol = con.symbol.lower() + # TODO: probably write a mofo exchange mapper routine since ib + # can't get it's shit together like, ever. + # try our best to figure out the exchange / venue exch = (con.primaryExchange or con.exchange).lower() if not exch: - # for wtv cucked reason some futes don't show their - # exchange (like CL.NYMEX) ... - entry = _adhoc_symbol_map.get( - con.symbol or con.localSymbol - ) - if entry: - meta, kwargs = entry - cid = meta.get('conId') - if cid: - assert con.conId == meta['conId'] - exch = meta['exchange'] + + if isinstance(con, Forex): + # bc apparently it's not in the contract obj? + exch = 'idealfx' + + else: + # for wtv cucked reason some futes don't show their + # exchange (like CL.NYMEX) ... + entry = _adhoc_symbol_map.get( + con.symbol or con.localSymbol + ) + if entry: + meta, kwargs = entry + cid = meta.get('conId') + if cid: + assert con.conId == meta['conId'] + exch = meta['exchange'] assert exch, f'No clue:\n {con}' fqsn = '.'.join((symbol, exch)) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index b22ddc1b..c3ac985f 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -41,7 +41,8 @@ from trio_typing import TaskStatus from piker.data._sharedmem import ShmArray from .._util import SymbolNotFound, NoData from .api import ( - _adhoc_futes_set, + # _adhoc_futes_set, + con2fqsn, log, load_aio_clients, ibis, @@ -207,8 +208,6 @@ async def get_bars( except RequestError as err: msg = err.message - # why do we always need to rebind this? - # _err = err if 'No market data permissions for' in msg: # TODO: signalling for no permissions searches @@ -239,7 +238,8 @@ async def get_bars( # elif ( # err.code == 162 and - # 'Trading TWS session is connected from a different IP address' in err.message + # 'Trading TWS session is connected from a different IP + # address' in err.message # ): # log.warning("ignoring ip address warning") # continue @@ -560,38 +560,18 @@ async def open_aio_quote_stream( # TODO: cython/mypyc/numba this! +# or we can at least cache a majority of the values +# except for the ones we expect to change?.. def normalize( ticker: Ticker, calc_price: bool = False ) -> dict: - # should be real volume for this contract by default - calc_price = False - # check for special contract types con = ticker.contract - if type(con) in ( - ibis.Commodity, - ibis.Forex, - ): - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = con.secType - # no real volume on this tract - calc_price = True - else: - suffix = con.primaryExchange - if not suffix: - suffix = con.exchange - - # append a `.` to the returned symbol - # key for derivatives that normally is the expiry - # date key. - expiry = con.lastTradeDateOrContractMonth - if expiry: - suffix += f'.{expiry}' + fqsn, calc_price = con2fqsn(con) # convert named tuples to dicts so we send usable keys new_ticks = [] @@ -623,9 +603,7 @@ def normalize( # generate fqsn with possible specialized suffix # for derivatives, note the lowercase. - data['symbol'] = data['fqsn'] = '.'.join( - (con.symbol, suffix) - ).lower() + data['symbol'] = data['fqsn'] = fqsn # convert named tuples to dicts for transport tbts = data.get('tickByTicks') @@ -690,6 +668,13 @@ async def stream_quotes( # TODO: more consistent field translation atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] + if atype in { + 'forex', + 'index', + 'commodity', + }: + syminfo['no_vlm'] = True + # for stocks it seems TWS reports too small a tick size # such that you can't submit orders with that granularity? min_tick = 0.01 if atype == 'stock' else 0 @@ -716,9 +701,9 @@ async def stream_quotes( }, } - return init_msgs + return init_msgs, syminfo - init_msgs = mk_init_msgs() + init_msgs, syminfo = mk_init_msgs() # TODO: we should instead spawn a task that waits on a feed to start # and let it wait indefinitely..instead of this hard coded stuff. @@ -727,7 +712,13 @@ async def stream_quotes( # it might be outside regular trading hours so see if we can at # least grab history. - if isnan(first_ticker.last): + if ( + isnan(first_ticker.last) + and type(first_ticker.contract) not in ( + ibis.Commodity, + ibis.Forex + ) + ): task_status.started((init_msgs, first_quote)) # it's not really live but this will unblock @@ -750,10 +741,16 @@ async def stream_quotes( task_status.started((init_msgs, first_quote)) async with aclosing(stream): - if type(first_ticker.contract) not in ( - ibis.Commodity, - ibis.Forex - ): + if syminfo.get('no_vlm', False): + + # generally speaking these feeds don't + # include vlm data. + atype = syminfo['asset_type'] + log.info( + f'Non-vlm asset {sym}@{atype}, skipping quote poll...' + ) + + else: # wait for real volume on feed (trading might be closed) while True: ticker = await stream.receive() @@ -812,6 +809,9 @@ async def data_reset_hack( successful. - other OS support? - integration with ``ib-gw`` run in docker + Xorg? + - is it possible to offer a local server that can be accessed by + a client? Would be sure be handy for running native java blobs + that need to be wrangle. ''' @@ -926,7 +926,8 @@ async def open_symbol_search( # adhoc_match_results = {} # if adhoc_matches: # # TODO: do we need to pull contract details? - # adhoc_match_results = {i[0]: {} for i in adhoc_matches} + # adhoc_match_results = {i[0]: {} for i in + # adhoc_matches} log.debug(f'fuzzy matching stocks {stock_results}') stock_matches = fuzzy.extractBests( diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py index 677468ad..13708252 100644 --- a/piker/data/_normalize.py +++ b/piker/data/_normalize.py @@ -56,7 +56,7 @@ def iterticks( sig = ( time, tick['price'], - tick['size'] + tick.get('size') ) if ttype == 'dark_trade': diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 688b97eb..3231698b 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -453,13 +453,6 @@ class LinkedSplits(QWidget): # add crosshair graphic self.chart.addItem(self.cursor) - # axis placement - if ( - _xaxis_at == 'bottom' and - 'bottom' in self.chart.plotItem.axes - ): - self.chart.hideAxis('bottom') - # style? self.chart.setFrameStyle( QFrame.StyledPanel | @@ -524,21 +517,26 @@ class LinkedSplits(QWidget): cpw.hideAxis('left') cpw.hideAxis('bottom') - if self.xaxis_chart: - self.xaxis_chart.hideAxis('bottom') + if ( + _xaxis_at == 'bottom' and ( + self.xaxis_chart + or ( + not self.subplots + and self.xaxis_chart is None + ) + ) + ): + if self.xaxis_chart: + self.xaxis_chart.hideAxis('bottom') # presuming we only want it at the true bottom of all charts. # XXX: uses new api from our ``pyqtgraph`` fork. # https://github.com/pikers/pyqtgraph/tree/plotitemoverlay_onto_pg_master # _ = self.xaxis_chart.removeAxis('bottom', unlink=False) # assert 'bottom' not in self.xaxis_chart.plotItem.axes - self.xaxis_chart = cpw cpw.showAxis('bottom') - if self.xaxis_chart is None: - self.xaxis_chart = cpw - qframe.chart = cpw qframe.hbox.addWidget(cpw) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index d63ee17f..f79c56ae 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -63,7 +63,7 @@ from ..log import get_logger log = get_logger(__name__) # TODO: load this from a config.toml! -_quote_throttle_rate: int = 22 # Hz +_quote_throttle_rate: int = 60 # Hz # a working tick-type-classes template @@ -136,16 +136,16 @@ class DisplayState: # high level chart handles linked: LinkedSplits chart: ChartPlotWidget - vlm_chart: ChartPlotWidget # axis labels l1: L1Labels last_price_sticky: YAxisLabel - vlm_sticky: YAxisLabel # misc state tracking vars: dict[str, Any] + vlm_chart: Optional[ChartPlotWidget] = None + vlm_sticky: Optional[YAxisLabel] = None wap_in_history: bool = False @@ -185,9 +185,6 @@ async def graphics_update_loop( *ohlcv.array[-1][['index', 'close']] ) - if vlm_chart: - vlm_sticky = vlm_chart._ysticks['volume'] - maxmin = partial( chart_maxmin, chart, @@ -236,8 +233,6 @@ async def graphics_update_loop( 'ohlcv': ohlcv, 'chart': chart, 'last_price_sticky': last_price_sticky, - 'vlm_chart': vlm_chart, - 'vlm_sticky': vlm_sticky, 'l1': l1, 'vars': { @@ -250,6 +245,11 @@ async def graphics_update_loop( } }) + if vlm_chart: + vlm_sticky = vlm_chart._ysticks['volume'] + ds.vlm_chart = vlm_chart + ds.vlm_sticky = vlm_sticky + chart.default_view() # main real-time quotes update loop @@ -322,7 +322,7 @@ def graphics_update_cycle( for sym, quote in ds.quotes.items(): # compute the first available graphic's x-units-per-pixel - uppx = vlm_chart.view.x_uppx() + uppx = chart.view.x_uppx() # NOTE: vlm may be written by the ``brokerd`` backend # event though a tick sample is not emitted. @@ -786,7 +786,10 @@ async def display_symbol_data( async with trio.open_nursery() as ln: # if available load volume related built-in display(s) - if has_vlm(ohlcv): + if ( + not symbol.broker_info[provider].get('no_vlm', False) + and has_vlm(ohlcv) + ): vlm_chart = await ln.start( open_vlm_displays, linked, @@ -821,6 +824,9 @@ async def display_symbol_data( order_mode_started ) ): + if not vlm_chart: + chart.default_view() + # let Qt run to render all widgets and make sure the # sidepanes line up vertically. await trio.sleep(0)