From c92a23619622c59168bfd0543614f877e404dfeb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 18 Jun 2024 10:00:18 -0400 Subject: [PATCH 1/5] ib: more trade record edge case handling - timestamps came as `'date'`-keyed from 2022 and before but now are `'datetime'`.. - some symbols seem to have no commission field, so handle that.. - when no `'price'` field found return `None` from `norm_trade()`. - add a warn log on mid-fill commission updates. --- piker/brokers/ib/broker.py | 9 ++++- piker/brokers/ib/ledger.py | 74 +++++++++++++++++++++++++++----------- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index b317da22..ddda9020 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -1183,7 +1183,14 @@ async def deliver_trade_events( pos and fill ): - assert fill.commissionReport == cr + now_cr: CommissionReport = fill.commissionReport + if (now_cr != cr): + log.warning( + 'UhhHh ib updated the commission report mid-fill..?\n' + f'was: {pformat(cr)}\n' + f'now: {pformat(now_cr)}\n' + ) + await emit_pp_update( ems_stream, accounts_def, diff --git a/piker/brokers/ib/ledger.py b/piker/brokers/ib/ledger.py index a767d551..d62b4ba7 100644 --- a/piker/brokers/ib/ledger.py +++ b/piker/brokers/ib/ledger.py @@ -31,7 +31,11 @@ from typing import ( ) from bidict import bidict -import pendulum +from pendulum import ( + DateTime, + parse, + from_timestamp, +) from ib_insync import ( Contract, Commodity, @@ -66,10 +70,11 @@ tx_sort: Callable = partial( iter_by_dt, parsers={ 'dateTime': parse_flex_dt, - 'datetime': pendulum.parse, - # for some some fucking 2022 and - # back options records...fuck me. - 'date': pendulum.parse, + 'datetime': parse, + + # XXX: for some some fucking 2022 and + # back options records.. f@#$ me.. + 'date': parse, } ) @@ -89,15 +94,38 @@ def norm_trade( conid: int = str(record.get('conId') or record['conid']) bs_mktid: str = str(conid) - comms = record.get('commission') - if comms is None: - comms = -1*record['ibCommission'] - price = record.get('price') or record['tradePrice'] + # NOTE: sometimes weird records (like BTTX?) + # have no field for this? + comms: float = -1 * ( + record.get('commission') + or record.get('ibCommission') + or 0 + ) + if not comms: + log.warning( + 'No commissions found for record?\n' + f'{pformat(record)}\n' + ) + + price: float = ( + record.get('price') + or record.get('tradePrice') + ) + if price is None: + log.warning( + 'No `price` field found in record?\n' + 'Skipping normalization..\n' + f'{pformat(record)}\n' + ) + return None # the api doesn't do the -/+ on the quantity for you but flex # records do.. are you fucking serious ib...!? - size = record.get('quantity') or record['shares'] * { + size: float|int = ( + record.get('quantity') + or record['shares'] + ) * { 'BOT': 1, 'SLD': -1, }[record['side']] @@ -128,26 +156,31 @@ def norm_trade( # otype = tail[6] # strike = tail[7:] - print(f'skipping opts contract {symbol}') + log.warning( + f'Skipping option contract -> NO SUPPORT YET!\n' + f'{symbol}\n' + ) return None # timestamping is way different in API records - dtstr = record.get('datetime') - date = record.get('date') - flex_dtstr = record.get('dateTime') + dtstr: str = record.get('datetime') + date: str = record.get('date') + flex_dtstr: str = record.get('dateTime') if dtstr or date: - dt = pendulum.parse(dtstr or date) + dt: DateTime = parse(dtstr or date) elif flex_dtstr: # probably a flex record with a wonky non-std timestamp.. - dt = parse_flex_dt(record['dateTime']) + dt: DateTime = parse_flex_dt(record['dateTime']) # special handling of symbol extraction from # flex records using some ad-hoc schema parsing. - asset_type: str = record.get( - 'assetCategory' - ) or record.get('secType', 'STK') + asset_type: str = ( + record.get('assetCategory') + or record.get('secType') + or 'STK' + ) if (expiry := ( record.get('lastTradeDateOrContractMonth') @@ -357,6 +390,7 @@ def norm_trade_records( if txn is None: continue + # inject txns sorted by datetime insort( records, txn, @@ -405,7 +439,7 @@ def api_trades_to_ledger_entries( txn_dict[attr_name] = val tid = str(txn_dict['execId']) - dt = pendulum.from_timestamp(txn_dict['time']) + dt = from_timestamp(txn_dict['time']) txn_dict['datetime'] = str(dt) acctid = accounts[txn_dict['acctNumber']] -- 2.34.1 From 4f9998e9fbaf27556bd3e69bce2ed3f990e1a328 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 18 Jun 2024 10:03:34 -0400 Subject: [PATCH 2/5] ib: mask out trade and vlm rates for now --- piker/brokers/ib/feed.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 89d43b98..08751129 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -671,8 +671,8 @@ async def _setup_quote_stream( # making them mostly useless and explains why the scanner # is always slow XD # '293', # Trade count for day - '294', # Trade rate / minute - '295', # Vlm rate / minute + # '294', # Trade rate / minute + # '295', # Vlm rate / minute ), contract: Contract | None = None, -- 2.34.1 From 4940aabe0597ebaa26679371ab901a7bd8f4b8bc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 18 Jun 2024 12:42:21 -0400 Subject: [PATCH 3/5] ib: warn about mkt precision cuckups that `Contract`s clearly deliver wrong.. --- piker/brokers/ib/symbols.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib/symbols.py b/piker/brokers/ib/symbols.py index 6fe86aa9..d7c09fef 100644 --- a/piker/brokers/ib/symbols.py +++ b/piker/brokers/ib/symbols.py @@ -294,7 +294,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None: elif stock_results: break # else: - await tractor.pause() + # await tractor.pause() # # match against our ad-hoc set immediately # adhoc_matches = fuzzy.extract( @@ -522,7 +522,21 @@ async def get_mkt_info( venue = con.primaryExchange or con.exchange price_tick: Decimal = Decimal(str(details.minTick)) - # price_tick: Decimal = Decimal('0.01') + ib_min_tick_gt_2: Decimal = Decimal('0.01') + if ( + price_tick < ib_min_tick_gt_2 + ): + # TODO: we need to add some kinda dynamic rounding sys + # to our MktPair i guess? + # not sure where the logic should sit, but likely inside + # the `.clearing._ems` i suppose... + log.warning( + 'IB seems to disallow a min price tick < 0.01 ' + 'when the price is > 2.0..?\n' + f'Decreasing min tick precision for {fqme} to 0.01' + ) + # price_tick = ib_min_tick + # await tractor.pause() if atype == 'stock': # XXX: GRRRR they don't support fractional share sizes for -- 2.34.1 From 70332e375b1d0adb03857d26c145b929b85fa60f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 20 Jun 2024 14:40:21 -0400 Subject: [PATCH 4/5] ib: `.api` mod and log-fmt cleaning About time we tidy'd a buncha status logging in this backend.. particularly for boot-up where there's lots of client-try-connect poll looping with account detection from the user config. `.api.Client` pprint and logging fmt improvements: - add `Client.__repr__()` which shows the minimally useful set of info from the underlying `.ib: IB` as well as a new `.acnts: list[str]` of the account aliases defined in the user's `brokers.toml`. - mk `.bars()` define a comprehensive `query_info: str` with all the request deats but only display if there's a problem with the response data. - mk `.get_config()` report both the config file path and the acnt aliases (NOT the actual account #s). - move all `.load_aio_clients()` client poll loop requests do `log.runtime()` statuses, only falling through to a `.warning()` when the loop fails to connect the client to the spec-ed API-gw addr, and |_ don't allow loading accounts for which the user has not defined an alias in `brokers.toml::[ib]`; raise a value-error in such cases with a message indicating how to mod the config. |_ only `log.info()` about acnts if some were loaded.. Other mod logging de-noising: - better status fmting in `.symbols.open_symbol_search()` with `repr(Client)`. - for `.feed.stream_quotes()` first quote reporting use `.runtime()`. --- piker/brokers/ib/_util.py | 4 +- piker/brokers/ib/api.py | 187 ++++++++++++++++++++++++------------ piker/brokers/ib/feed.py | 16 ++- piker/brokers/ib/symbols.py | 5 +- 4 files changed, 142 insertions(+), 70 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index a80bd514..2c71bc46 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -100,7 +100,7 @@ async def data_reset_hack( log.warning( no_setup_msg + - f'REQUIRES A `vnc_addrs: array` ENTRY' + 'REQUIRES A `vnc_addrs: array` ENTRY' ) vnc_host, vnc_port = vnc_sockaddr.get( @@ -259,7 +259,7 @@ def i3ipc_xdotool_manual_click_hack() -> None: timeout=timeout, ) - # re-activate and focus original window + # re-activate and focus original window subprocess.call([ 'xdotool', 'windowactivate', '--sync', str(orig_win_id), diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 2fe540bd..ae32b0ee 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -287,9 +287,31 @@ class Client: self.conf = config # NOTE: the ib.client here is "throttled" to 45 rps by default - self.ib = ib + self.ib: IB = ib self.ib.RaiseRequestErrors: bool = True + # self._acnt_names: set[str] = {} + self._acnt_names: list[str] = [] + + @property + def acnts(self) -> list[str]: + # return list(self._acnt_names) + return self._acnt_names + + def __repr__(self) -> str: + return ( + f'<{type(self).__name__}(' + f'ib={self.ib} ' + f'acnts={self.acnts}' + + # TODO: we need to mask out acnt-#s and other private + # infos if we're going to console this! + # f' |_.conf:\n' + # f' {pformat(self.conf)}\n' + + ')>' + ) + async def get_fills(self) -> list[Fill]: ''' Return list of rents `Fills` from trading session. @@ -376,55 +398,63 @@ class Client: # whatToShow='MIDPOINT', # whatToShow='TRADES', ) - log.info( - f'REQUESTING {ib_duration_str} worth {bar_size} BARS\n' - f'fqme: {fqme}\n' - f'global _enters: {_enters}\n' - f'kwargs: {pformat(kwargs)}\n' - ) - bars = await self.ib.reqHistoricalDataAsync( **kwargs, ) + query_info: str = ( + f'REQUESTING IB history BARS\n' + f' ------ - ------\n' + f'dt_duration: {dt_duration}\n' + f'ib_duration_str: {ib_duration_str}\n' + f'bar_size: {bar_size}\n' + f'fqme: {fqme}\n' + f'actor-global _enters: {_enters}\n' + f'kwargs: {pformat(kwargs)}\n' + ) # tail case if no history for range or none prior. + # NOTE: there's actually 3 cases here to handle (and + # this should be read alongside the implementation of + # `.reqHistoricalDataAsync()`): + # - a timeout occurred in which case insync internals return + # an empty list thing with bars.clear()... + # - no data exists for the period likely due to + # a weekend, holiday or other non-trading period prior to + # ``end_dt`` which exceeds the ``duration``, + # - LITERALLY this is the start of the mkt's history! if not bars: - # NOTE: there's actually 3 cases here to handle (and - # this should be read alongside the implementation of - # `.reqHistoricalDataAsync()`): - # - a timeout occurred in which case insync internals return - # an empty list thing with bars.clear()... - # - no data exists for the period likely due to - # a weekend, holiday or other non-trading period prior to - # ``end_dt`` which exceeds the ``duration``, - # - LITERALLY this is the start of the mkt's history! + # TODO: figure out wut's going on here. + # TODO: is this handy, a sync requester for tinkering + # with empty frame cases? + # def get_hist(): + # return self.ib.reqHistoricalData(**kwargs) + # import pdbp + # pdbp.set_trace() - # sync requester for debugging empty frame cases - def get_hist(): - return self.ib.reqHistoricalData(**kwargs) + log.critical( + 'STUPID IB SAYS NO HISTORY\n\n' + + query_info + ) - assert get_hist - import pdbp - pdbp.set_trace() - - return [], np.empty(0), dt_duration # TODO: we could maybe raise ``NoData`` instead if we - # rewrite the method in the first case? right now there's no - # way to detect a timeout. + # rewrite the method in the first case? + # right now there's no way to detect a timeout.. + return [], np.empty(0), dt_duration - # NOTE XXX: ensure minimum duration in bars B) - # => we recursively call this method until we get at least - # as many bars such that they sum in aggregate to the the - # desired total time (duration) at most. - # XXX XXX XXX - # WHY DID WE EVEN NEED THIS ORIGINALLY!? - # XXX XXX XXX - # - if you query over a gap and get no data - # that may short circuit the history + log.info(query_info) + # NOTE XXX: ensure minimum duration in bars? + # => recursively call this method until we get at least as + # many bars such that they sum in aggregate to the the + # desired total time (duration) at most. + # - if you query over a gap and get no data + # that may short circuit the history if ( - end_dt - and False + # XXX XXX XXX + # => WHY DID WE EVEN NEED THIS ORIGINALLY!? <= + # XXX XXX XXX + False + and end_dt ): nparr: np.ndarray = bars_to_np(bars) times: np.ndarray = nparr['time'] @@ -927,7 +957,10 @@ class Client: warnset = True else: - log.info(f'Got first quote for {contract}') + log.info( + 'Got first quote for contract\n' + f'{contract}\n' + ) break else: if timeouterr and raise_on_timeout: @@ -991,8 +1024,12 @@ class Client: outsideRth=True, optOutSmartRouting=True, + # TODO: need to understand this setting better as + # it pertains to shit ass mms.. routeMarketableToBbo=True, + designatedLocation='SMART', + # TODO: make all orders GTC? # https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba # goodTillDate=f"yyyyMMdd-HH:mm:ss", @@ -1120,8 +1157,8 @@ def get_config() -> dict[str, Any]: names = list(accounts.keys()) accts = section['accounts'] = bidict(accounts) log.info( - f'brokers.toml defines {len(accts)} accounts: ' - f'{pformat(names)}' + f'{path} defines {len(accts)} account aliases:\n' + f'{pformat(names)}\n' ) if section is None: @@ -1188,7 +1225,7 @@ async def load_aio_clients( try_ports = list(try_ports.values()) _err = None - accounts_def = config.load_accounts(['ib']) + accounts_def: dict[str, str] = config.load_accounts(['ib']) ports = try_ports if port is None else [port] combos = list(itertools.product(hosts, ports)) accounts_found: dict[str, Client] = {} @@ -1227,7 +1264,9 @@ async def load_aio_clients( client = Client(ib=ib, config=conf) # update all actor-global caches - log.info(f"Caching client for {sockaddr}") + log.runtime( + f'Connected and caching `Client` @ {sockaddr!r}' + ) _client_cache[sockaddr] = client break @@ -1242,37 +1281,59 @@ async def load_aio_clients( OSError, ) as ce: _err = ce - log.warning( - f'Failed to connect on {host}:{port} for {i} time with,\n' - f'{ib.client.apiError.value()}\n' - 'retrying with a new client id..') + message: str = ( + f'Failed to connect on {host}:{port} after {i} tries with\n' + f'{ib.client.apiError.value()!r}\n\n' + 'Retrying with a new client id..\n' + ) + log.runtime(message) + else: + # XXX report loudly if we never established after all + # re-tries + log.warning(message) # Pre-collect all accounts available for this # connection and map account names to this client # instance. for value in ib.accountValues(): - acct_number = value.account + acct_number: str = value.account - entry = accounts_def.inverse.get(acct_number) - if not entry: + acnt_alias: str = accounts_def.inverse.get(acct_number) + if not acnt_alias: + + # TODO: should we constuct the below reco-ex from + # the existing config content? + _, path = config.load( + conf_name='brokers', + ) raise ValueError( - 'No section in brokers.toml for account:' - f' {acct_number}\n' - f'Please add entry to continue using this API client' + 'No alias in account section for account!\n' + f'Please add an acnt alias entry to your {path}\n' + 'For example,\n\n' + + '[ib.accounts]\n' + 'margin = {accnt_number!r}\n' + '^^^^^^ <- you need this part!\n\n' + + 'This ensures `piker` will not leak private acnt info ' + 'to console output by default!\n' ) # surjection of account names to operating clients. - if acct_number not in accounts_found: - accounts_found[entry] = client + if acnt_alias not in accounts_found: + accounts_found[acnt_alias] = client + # client._acnt_names.add(acnt_alias) + client._acnt_names.append(acnt_alias) - log.info( - f'Loaded accounts for client @ {host}:{port}\n' - f'{pformat(accounts_found)}' - ) + if accounts_found: + log.info( + f'Loaded accounts for api client\n\n' + f'{pformat(accounts_found)}\n' + ) - # XXX: why aren't we just updating this directy above - # instead of using the intermediary `accounts_found`? - _accounts2clients.update(accounts_found) + # XXX: why aren't we just updating this directy above + # instead of using the intermediary `accounts_found`? + _accounts2clients.update(accounts_found) # if we have no clients after the scan loop then error out. if not _client_cache: @@ -1472,7 +1533,7 @@ async def open_aio_client_method_relay( msg: tuple[str, dict] | dict | None = await from_trio.get() match msg: case None: # termination sentinel - print('asyncio PROXY-RELAY SHUTDOWN') + log.info('asyncio `Client` method-proxy SHUTDOWN!') break case (meth_name, kwargs): diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 08751129..2c1a9224 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -915,9 +915,13 @@ async def stream_quotes( if first_ticker: first_quote: dict = normalize(first_ticker) - log.info( - 'Rxed init quote:\n' - f'{pformat(first_quote)}' + + # TODO: we need a stack-oriented log levels filters for + # this! + # log.info(message, filter={'stack': 'live_feed'}) ? + log.runtime( + 'Rxed init quote:\n\n' + f'{pformat(first_quote)}\n' ) # NOTE: it might be outside regular trading hours for @@ -969,7 +973,11 @@ async def stream_quotes( raise_on_timeout=True, ) first_quote: dict = normalize(first_ticker) - log.info( + + # TODO: we need a stack-oriented log levels filters for + # this! + # log.info(message, filter={'stack': 'live_feed'}) ? + log.runtime( 'Rxed init quote:\n' f'{pformat(first_quote)}' ) diff --git a/piker/brokers/ib/symbols.py b/piker/brokers/ib/symbols.py index d7c09fef..04ec74e4 100644 --- a/piker/brokers/ib/symbols.py +++ b/piker/brokers/ib/symbols.py @@ -209,7 +209,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None: break ib_client = proxy._aio_ns.ib - log.info(f'Using {ib_client} for symbol search') + log.info( + f'Using API client for symbol-search\n' + f'{ib_client}\n' + ) last = time.time() async for pattern in stream: -- 2.34.1 From cb92abbc3847601d6bbc0ba9f1332fe19c711321 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 14 Aug 2024 18:04:34 -0400 Subject: [PATCH 5/5] ib: add connect status info emit --- piker/brokers/ib/api.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index ae32b0ee..23222512 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1250,6 +1250,12 @@ async def load_aio_clients( for i in range(connect_retries): try: + log.info( + 'Trying `ib_async` connect\n' + f'{host}: {port}\n' + f'clientId: {client_id}\n' + f'timeout: {connect_timeout}\n' + ) await ib.connectAsync( host, port, @@ -1367,7 +1373,9 @@ async def load_clients_for_trio( a ``tractor.to_asyncio.open_channel_from()``. ''' - async with load_aio_clients() as accts2clients: + async with load_aio_clients( + disconnect_on_exit=False, + ) as accts2clients: to_trio.send_nowait(accts2clients) -- 2.34.1