diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index cbb8f406..ec163fbb 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -60,7 +60,6 @@ import numpy as np from .. import config from ..log import get_logger, get_console_log -# from .._daemon import maybe_spawn_brokerd from ..data._source import from_df from ..data._sharedmem import ShmArray from ._util import SymbolNotFound, NoData @@ -112,13 +111,18 @@ _show_wap_in_history: bool = False # accepting patterns before the kb has settled more then # a quarter second). _search_conf = { - 'pause_period': 6/16, + 'pause_period': 6 / 16, } +# annotation to let backend agnostic code +# know if ``brokerd`` should be spawned with +# ``tractor``'s aio mode. +_infect_asyncio: bool = True + + # overrides to sidestep pretty questionable design decisions in # ``ib_insync``: - class NonShittyWrapper(Wrapper): def tcpDataArrived(self): """Override time stamps to be floats for now. @@ -173,6 +177,13 @@ _adhoc_cmdty_data_map = { 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), } +_futes_venues = ( + 'GLOBEX', + 'NYMEX', + 'CME', + 'CMECRYPTO', +) + _adhoc_futes_set = { # equities @@ -243,10 +254,10 @@ class Client: async def bars( self, - symbol: str, + fqsn: str, # EST in ISO 8601 format is required... below is EPOCH start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", - end_dt: Union[datetime, str ] = "", + end_dt: Union[datetime, str] = "", sample_period_s: str = 1, # ohlc sample period period_count: int = int(2e3), # <- max per 1s sample query @@ -254,7 +265,7 @@ class Client: is_paid_feed: bool = False, # placeholder ) -> list[dict[str, Any]]: ''' - Retreive OHLCV bars for a symbol over a range to the present. + Retreive OHLCV bars for a fqsn over a range to the present. ''' bars_kwargs = {'whatToShow': 'TRADES'} @@ -263,7 +274,7 @@ class Client: print(f'ENTER BARS {_enters} @ end={end_dt}') _enters += 1 - contract = await self.find_contract(symbol) + contract = await self.find_contract(fqsn) bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) # _min = min(2000*100, count) @@ -300,7 +311,7 @@ class Client: ) if not bars: # TODO: raise underlying error here - raise ValueError(f"No bars retreived for {symbol}?") + raise ValueError(f"No bars retreived for {fqsn}?") # TODO: rewrite this faster with ``numba`` # convert to pandas dataframe: @@ -342,23 +353,24 @@ class Client: async def search_stocks( self, pattern: str, - get_details: bool = False, - # how many contracts to search "up to" - upto: int = 3, + upto: int = 3, # how many contracts to search "up to" ) -> dict[str, ContractDetails]: - """Search for stocks matching provided ``str`` pattern. + ''' + Search for stocks matching provided ``str`` pattern. Return a dictionary of ``upto`` entries worth of contract details. - """ + + ''' descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) if descriptions is not None: descrs = descriptions[:upto] if get_details: - return await self.con_deats([d.contract for d in descrs]) + deats = await self.con_deats([d.contract for d in descrs]) + return deats else: results = {} @@ -368,6 +380,10 @@ class Client: # from search? exch = con.primaryExchange.rsplit('.')[0] unique_sym = f'{con.symbol}.{exch}' + expiry = con.lastTradeDateOrContractMonth + if expiry: + unique_sym += f'{expiry}' + results[unique_sym] = {} return results @@ -385,26 +401,75 @@ class Client: # TODO add search though our adhoc-locally defined symbol set # for futes/cmdtys/ - return await self.search_stocks(pattern, upto, get_details=True) + results = await self.search_stocks( + pattern, + upto=upto, + get_details=True, + ) - async def get_cont_fute( + for key, contracts in results.copy().items(): + tract = contracts['contract'] + sym = tract['symbol'] + + sectype = tract['secType'] + if sectype == 'IND': + results[f'{sym}.IND'] = tract + results.pop(key) + exch = tract['exchange'] + + if exch in _futes_venues: + # try get all possible contracts for symbol as per, + # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut + con = Contract( + 'FUT+CONTFUT', + symbol=sym, + exchange=exch, + ) + possibles = await self.ib.qualifyContractsAsync(con) + for i, condict in enumerate(sorted( + map(asdict, possibles), + # sort by expiry + key=lambda con: con['lastTradeDateOrContractMonth'], + )): + expiry = condict['lastTradeDateOrContractMonth'] + results[f'{sym}.{exch}.{expiry}'] = condict + + return results + + async def get_fute( self, symbol: str, exchange: str, - ) -> Contract: - """Get an unqualifed contract for the current "continous" future. - """ - contcon = ibis.ContFuture(symbol, exchange=exchange) + expiry: str = '', + front: bool = False, + ) -> Contract: + ''' + Get an unqualifed contract for the current "continous" future. + + ''' # it's the "front" contract returned here - frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] - return ibis.Future(conId=frontcon.conId) + if front: + con = (await self.ib.qualifyContractsAsync( + ibis.ContFuture(symbol, exchange=exchange) + ))[0] + else: + con = (await self.ib.qualifyContractsAsync( + ibis.Future( + symbol, + exchange=exchange, + lastTradeDateOrContractMonth=expiry, + ) + ))[0] + + return con async def find_contract( self, - symbol, + pattern: str, currency: str = 'USD', **kwargs, + ) -> Contract: # TODO: we can't use this currently because @@ -418,11 +483,20 @@ class Client: # XXX UPDATE: we can probably do the tick/trades scraping # inside our eventkit handler instead to bypass this entirely? + if 'ib' in pattern: + from ..data._source import uncons_fqsn + broker, symbol, expiry = uncons_fqsn(pattern) + else: + symbol = pattern + # try: # # give the cache a go # return self._contracts[symbol] # except KeyError: # log.debug(f'Looking up contract for {symbol}') + expiry: str = '' + if symbol.count('.') > 1: + symbol, _, expiry = symbol.rpartition('.') # use heuristics to figure out contract "type" try: @@ -431,9 +505,27 @@ class Client: # likely there's an embedded `.` for a forex pair breakpoint() + qualify: bool = True + # futes - if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): - con = await self.get_cont_fute(symbol=sym, exchange=exch) + if exch in _futes_venues: + if expiry: + # get the "front" contract + contract = await self.get_fute( + symbol=sym, + exchange=exch, + expiry=expiry, + ) + + else: + # get the "front" contract + contract = await self.get_fute( + symbol=sym, + exchange=exch, + front=True, + ) + + qualify = False elif exch in ('FOREX'): currency = '' @@ -473,12 +565,15 @@ class Client: ) try: exch = 'SMART' if not exch else exch - contract = (await self.ib.qualifyContractsAsync(con))[0] + if qualify: + contract = (await self.ib.qualifyContractsAsync(con))[0] + else: + assert contract except IndexError: raise ValueError(f"No contract could be found {con}") - self._contracts[symbol] = contract + self._contracts[pattern] = contract return contract async def get_head_time( @@ -828,8 +923,8 @@ async def load_aio_clients( accounts_found: dict[str, Client] = {} if ( - client and client.ib.isConnected() or - sockaddr in _scan_ignore + client and client.ib.isConnected() + or sockaddr in _scan_ignore ): continue @@ -1039,8 +1134,12 @@ async def open_aio_client_method_relay( # relay all method requests to ``asyncio``-side client and # deliver back results - while True: + while not to_trio._closed: msg = await from_trio.get() + if msg is None: + print('asyncio PROXY-RELAY SHUTDOWN') + break + meth_name, kwargs = msg meth = getattr(client, meth_name) @@ -1071,45 +1170,47 @@ async def open_client_proxy() -> MethodProxy: yield proxy - except RequestError as err: - code, msg = err.code, err.message + # terminate asyncio side task + await chan.send(None) - # TODO: retreive underlying ``ib_insync`` error? - if ( - code == 162 and ( - 'HMDS query returned no data' in msg - or 'No market data permissions for' in msg - ) - ): - # these cases should not cause a task crash - log.warning(msg) + except ( + RequestError, + BaseException, + )as err: + code = getattr(err, 'code', None) + if code: + msg = err.message + await tractor.breakpoint() + + # TODO: retreive underlying ``ib_insync`` error? + if ( + code == 162 and ( + 'HMDS query returned no data' in msg + or 'No market data permissions for' in msg + ) + or code == 200 + ): + # these cases should not cause a task crash + log.warning(msg) else: raise -# @acm -# async def get_client( -# **kwargs, +@acm +async def get_client( + **kwargs, -# ) -> Client: -# ''' -# Init the ``ib_insync`` client in another actor and return -# a method proxy to it. +) -> Client: + ''' + Init the ``ib_insync`` client in another actor and return + a method proxy to it. -# ''' -# async with ( -# maybe_spawn_brokerd( -# brokername='ib', -# infect_asyncio=True, -# **kwargs -# ) as portal, -# ): -# assert 0 - # TODO: the IPC via portal relay layer for when this current - # actor isn't in aio mode. - # open_client_proxy() as proxy, - # yield proxy + ''' + # TODO: the IPC via portal relay layer for when this current + # actor isn't in aio mode. + async with open_client_proxy() as proxy: + yield proxy # https://interactivebrokers.github.io/tws-api/tick_types.html @@ -1137,11 +1238,40 @@ tick_types = { } +# TODO: cython/mypyc/numba this! def normalize( ticker: Ticker, calc_price: bool = False ) -> dict: + + # should be real volume for this contract by default + calc_price = False + + # check for special contract types + con = ticker.contract + if type(con) in ( + ibis.Commodity, + ibis.Forex, + ): + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = con.secType + # no real volume on this tract + calc_price = True + + else: + suffix = con.primaryExchange + if not suffix: + suffix = con.exchange + + # append a `.` to the returned symbol + # key for derivatives that normally is the expiry + # date key. + expiry = con.lastTradeDateOrContractMonth + if expiry: + suffix += f'.{expiry}' + # convert named tuples to dicts so we send usable keys new_ticks = [] for tick in ticker.ticks: @@ -1170,6 +1300,12 @@ def normalize( # serialize for transport data = asdict(ticker) + # generate fqsn with possible specialized suffix + # for derivatives. + data['symbol'] = data['fqsn'] = '.'.join( + (con.symbol, suffix) + ).lower() + # convert named tuples to dicts for transport tbts = data.get('tickByTicks') if tbts: @@ -1191,7 +1327,7 @@ def normalize( async def get_bars( proxy: MethodProxy, - sym: str, + fqsn: str, end_dt: str = "", ) -> (dict, np.ndarray): @@ -1204,15 +1340,15 @@ async def get_bars( fails = 0 bars: Optional[list] = None - for _ in range(2): + for _ in range(3): try: bars, bars_array = await proxy.bars( - symbol=sym, + fqsn=fqsn, end_dt=end_dt, ) if bars_array is None: - raise SymbolNotFound(sym) + raise SymbolNotFound(fqsn) next_dt = bars[0].date print(f'ib datetime {next_dt}') @@ -1252,7 +1388,7 @@ async def get_bars( elif 'No market data permissions for' in err.message: # TODO: signalling for no permissions searches - raise NoData(f'Symbol: {sym}') + raise NoData(f'Symbol: {fqsn}') break else: @@ -1311,7 +1447,7 @@ async def open_history_client( async def backfill_bars( - sym: str, + fqsn: str, shm: ShmArray, # type: ignore # noqa # TODO: we want to avoid overrunning the underlying shm array buffer @@ -1331,34 +1467,34 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 ''' - # async with open_history_client(sym) as proxy: - async with open_client_proxy() as proxy: + with trio.CancelScope() as cs: - if platform.system() == 'Windows': - log.warning( - 'Decreasing history query count to 4 since, windows...') - count = 4 + # async with open_history_client(fqsn) as proxy: + async with open_client_proxy() as proxy: - out, fails = await get_bars(proxy, sym) + if platform.system() == 'Windows': + log.warning( + 'Decreasing history query count to 4 since, windows...') + count = 4 - if out is None: - raise RuntimeError("Could not pull currrent history?!") + out, fails = await get_bars(proxy, fqsn) - (first_bars, bars_array, next_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 + if out is None: + raise RuntimeError("Could not pull currrent history?!") - # write historical data to buffer - shm.push(bars_array) + (first_bars, bars_array, next_dt) = out + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 - with trio.CancelScope() as cs: + # write historical data to buffer + shm.push(bars_array) task_status.started(cs) i = 0 while i < count: - out, fails = await get_bars(proxy, sym, end_dt=next_dt) + out, fails = await get_bars(proxy, fqsn, end_dt=next_dt) if fails is None or fails > 1: break @@ -1430,8 +1566,10 @@ async def _setup_quote_stream( contract: Optional[Contract] = None, ) -> trio.abc.ReceiveChannel: - """Stream a ticker using the std L1 api. - """ + ''' + Stream a ticker using the std L1 api. + + ''' global _quote_streams to_trio.send_nowait(None) @@ -1519,7 +1657,10 @@ async def open_aio_quote_stream( if from_aio: # if we already have a cached feed deliver a rx side clone to consumer - async with broadcast_receiver(from_aio) as from_aio: + async with broadcast_receiver( + from_aio, + 2**6, + ) as from_aio: yield from_aio return @@ -1555,17 +1696,13 @@ async def stream_quotes( ''' # TODO: support multiple subscriptions sym = symbols[0] + log.info(f'request for real-time quotes: {sym}') - contract, first_ticker, details = await _trio_run_client_method( + con, first_ticker, details = await _trio_run_client_method( method='get_sym_details', symbol=sym, ) - - with trio.move_on_after(1): - contract, first_ticker, details = await _trio_run_client_method( - method='get_quote', - symbol=sym, - ) + first_quote = normalize(first_ticker) def mk_init_msgs() -> dict[str, dict]: # pass back some symbol info like min_tick, trading_hours, etc. @@ -1593,46 +1730,23 @@ async def stream_quotes( # and that history has been written sym: { 'symbol_info': syminfo, + 'fqsn': first_quote['fqsn'], } } return init_msgs init_msgs = mk_init_msgs() - con = first_ticker.contract - - # should be real volume for this contract by default - calc_price = False - - # check for special contract types - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - - suffix = con.primaryExchange - if not suffix: - suffix = con.exchange - - else: - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = con.secType - # no real volume on this tract - calc_price = True - - quote = normalize(first_ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], suffix)).lower() - quote['symbol'] = topic - - # for compat with upcoming fqsn based derivs search - init_msgs[sym]['fqsn'] = topic - - # pass first quote asap - first_quote = quote + with trio.move_on_after(1): + contract, first_ticker, details = await _trio_run_client_method( + method='get_quote', + symbol=sym, + ) # it might be outside regular trading hours so see if we can at # least grab history. if isnan(first_ticker.last): - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, first_quote)) # it's not really live but this will unblock # the brokerd feed task to tell the ui to update? @@ -1643,30 +1757,32 @@ async def stream_quotes( return # we never expect feed to come up? async with open_aio_quote_stream( - symbol=sym, contract=contract + symbol=sym, + contract=con, ) as stream: # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) first_ticker.ticks = [] - log.debug(f"First ticker received {quote}") - - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, first_quote)) async with aclosing(stream): - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - # suffix = 'exchange' - # calc_price = False # should be real volume for contract - + if type(first_ticker.contract) not in ( + ibis.Commodity, + ibis.Forex + ): # wait for real volume on feed (trading might be closed) while True: - ticker = await stream.receive() # for a real volume contract we rait for the first # "real" trade to take place - if not calc_price and not ticker.rtTime: + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): # spin consuming tickers until we get a real # market datum log.debug(f"New unsent ticker: {ticker}") @@ -1681,21 +1797,16 @@ async def stream_quotes( # ``aclosing()`` above? break + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + # tell caller quotes are now coming in live feed_is_live.set() # last = time.time() async for ticker in stream: - # print(f'ticker rate: {1/(time.time() - last)}') - - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - - quote['symbol'] = topic - await send_chan.send({topic: quote}) + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) # ugh, clear ticks since we've consumed them ticker.ticks = [] @@ -1713,11 +1824,11 @@ def pack_position( symbol = con.localSymbol.replace(' ', '') else: + # TODO: lookup fqsn even for derivs. symbol = con.symbol.lower() exch = (con.primaryExchange or con.exchange).lower() symkey = '.'.join((symbol, exch)) - if not exch: # attempt to lookup the symbol from our # hacked set.. @@ -1726,7 +1837,11 @@ def pack_position( symkey = sym break - # TODO: options contracts into a sane format.. + expiry = con.lastTradeDateOrContractMonth + if expiry: + symkey += f'.{expiry}' + + # TODO: options contracts into a sane format.. return BrokerdPosition( broker='ib', @@ -2105,7 +2220,7 @@ async def open_symbol_search( sn.start_soon( stash_results, _trio_run_client_method( - method='search_stocks', + method='search_symbols', pattern=pattern, upto=5, )