ib: `.api` mod and log-fmt cleaning
About time we tidy'd a buncha status logging in this backend..
particularly for boot-up where there's lots of client-try-connect poll
looping with account detection from the user config.
`.api.Client` pprint and logging fmt improvements:
- add `Client.__repr__()` which shows the minimally useful set of info
  from the underlying `.ib: IB` as well as a new `.acnts: list[str]`
  of the account aliases defined in the user's `brokers.toml`.
- mk `.bars()` define a comprehensive `query_info: str` with all the
  request deats but only display if there's a problem with the response
  data.
- mk `.get_config()` report both the config file path and the acnt
  aliases (NOT the actual account #s).
- move all `.load_aio_clients()` client poll loop requests do
  `log.runtime()` statuses, only falling through to a `.warning()` when
  the loop fails to connect the client to the spec-ed API-gw addr, and
 |_ don't allow loading accounts for which the user has not defined an
    alias in `brokers.toml::[ib]`; raise a value-error in such cases
    with a message indicating how to mod the config.
 |_ only `log.info()` about acnts if some were loaded..
Other mod logging de-noising:
- better status fmting in `.symbols.open_symbol_search()` with
  `repr(Client)`.
- for `.feed.stream_quotes()` first quote reporting use `.runtime()`.
			
			
				ib_refinements
			
			
		
							parent
							
								
									4940aabe05
								
							
						
					
					
						commit
						70332e375b
					
				|  | @ -100,7 +100,7 @@ async def data_reset_hack( | |||
|         log.warning( | ||||
|             no_setup_msg | ||||
|             + | ||||
|             f'REQUIRES A `vnc_addrs: array` ENTRY' | ||||
|             'REQUIRES A `vnc_addrs: array` ENTRY' | ||||
|         ) | ||||
| 
 | ||||
|     vnc_host, vnc_port = vnc_sockaddr.get( | ||||
|  | @ -259,7 +259,7 @@ def i3ipc_xdotool_manual_click_hack() -> None: | |||
|                         timeout=timeout, | ||||
|                     ) | ||||
| 
 | ||||
|     # re-activate and focus original window | ||||
|         # re-activate and focus original window | ||||
|         subprocess.call([ | ||||
|             'xdotool', | ||||
|             'windowactivate', '--sync', str(orig_win_id), | ||||
|  |  | |||
|  | @ -287,9 +287,31 @@ class Client: | |||
|         self.conf = config | ||||
| 
 | ||||
|         # NOTE: the ib.client here is "throttled" to 45 rps by default | ||||
|         self.ib = ib | ||||
|         self.ib: IB = ib | ||||
|         self.ib.RaiseRequestErrors: bool = True | ||||
| 
 | ||||
|         # self._acnt_names: set[str] = {} | ||||
|         self._acnt_names: list[str] = [] | ||||
| 
 | ||||
|     @property | ||||
|     def acnts(self) -> list[str]: | ||||
|         # return list(self._acnt_names) | ||||
|         return self._acnt_names | ||||
| 
 | ||||
|     def __repr__(self) -> str: | ||||
|         return ( | ||||
|             f'<{type(self).__name__}(' | ||||
|             f'ib={self.ib} ' | ||||
|             f'acnts={self.acnts}' | ||||
| 
 | ||||
|             # TODO: we need to mask out acnt-#s and other private | ||||
|             # infos if we're going to console this! | ||||
|             # f' |_.conf:\n' | ||||
|             # f'    {pformat(self.conf)}\n' | ||||
| 
 | ||||
|             ')>' | ||||
|         ) | ||||
| 
 | ||||
|     async def get_fills(self) -> list[Fill]: | ||||
|         ''' | ||||
|         Return list of rents `Fills` from trading session. | ||||
|  | @ -376,55 +398,63 @@ class Client: | |||
|             # whatToShow='MIDPOINT', | ||||
|             # whatToShow='TRADES', | ||||
|         ) | ||||
|         log.info( | ||||
|             f'REQUESTING {ib_duration_str} worth {bar_size} BARS\n' | ||||
|             f'fqme: {fqme}\n' | ||||
|             f'global _enters: {_enters}\n' | ||||
|             f'kwargs: {pformat(kwargs)}\n' | ||||
|         ) | ||||
| 
 | ||||
|         bars = await self.ib.reqHistoricalDataAsync( | ||||
|             **kwargs, | ||||
|         ) | ||||
| 
 | ||||
|         query_info: str = ( | ||||
|             f'REQUESTING IB history BARS\n' | ||||
|             f'    ------ - ------\n' | ||||
|             f'dt_duration: {dt_duration}\n' | ||||
|             f'ib_duration_str: {ib_duration_str}\n' | ||||
|             f'bar_size: {bar_size}\n' | ||||
|             f'fqme: {fqme}\n' | ||||
|             f'actor-global _enters: {_enters}\n' | ||||
|             f'kwargs: {pformat(kwargs)}\n' | ||||
|         ) | ||||
|         # tail case if no history for range or none prior. | ||||
|         # NOTE: there's actually 3 cases here to handle (and | ||||
|         # this should be read alongside the implementation of | ||||
|         # `.reqHistoricalDataAsync()`): | ||||
|         # - a timeout occurred in which case insync internals return | ||||
|         #   an empty list thing with bars.clear()... | ||||
|         # - no data exists for the period likely due to | ||||
|         #   a weekend, holiday or other non-trading period prior to | ||||
|         #   ``end_dt`` which exceeds the ``duration``, | ||||
|         # - LITERALLY this is the start of the mkt's history! | ||||
|         if not bars: | ||||
|             # NOTE: there's actually 3 cases here to handle (and | ||||
|             # this should be read alongside the implementation of | ||||
|             # `.reqHistoricalDataAsync()`): | ||||
|             # - a timeout occurred in which case insync internals return | ||||
|             #   an empty list thing with bars.clear()... | ||||
|             # - no data exists for the period likely due to | ||||
|             #   a weekend, holiday or other non-trading period prior to | ||||
|             #   ``end_dt`` which exceeds the ``duration``, | ||||
|             # - LITERALLY this is the start of the mkt's history! | ||||
|             # TODO: figure out wut's going on here. | ||||
| 
 | ||||
|             # TODO: is this handy, a sync requester for tinkering | ||||
|             # with empty frame cases? | ||||
|             # def get_hist(): | ||||
|             #     return self.ib.reqHistoricalData(**kwargs) | ||||
|             # import pdbp | ||||
|             # pdbp.set_trace() | ||||
| 
 | ||||
|             # sync requester for debugging empty frame cases | ||||
|             def get_hist(): | ||||
|                 return self.ib.reqHistoricalData(**kwargs) | ||||
|             log.critical( | ||||
|                 'STUPID IB SAYS NO HISTORY\n\n' | ||||
|                 + query_info | ||||
|             ) | ||||
| 
 | ||||
|             assert get_hist | ||||
|             import pdbp | ||||
|             pdbp.set_trace() | ||||
| 
 | ||||
|             return [], np.empty(0), dt_duration | ||||
|             # TODO: we could maybe raise ``NoData`` instead if we | ||||
|             # rewrite the method in the first case? right now there's no | ||||
|             # way to detect a timeout. | ||||
|             # rewrite the method in the first case? | ||||
|             # right now there's no way to detect a timeout.. | ||||
|             return [], np.empty(0), dt_duration | ||||
| 
 | ||||
|         # NOTE XXX: ensure minimum duration in bars B) | ||||
|         # => we recursively call this method until we get at least | ||||
|         # as many bars such that they sum in aggregate to the the | ||||
|         # desired total time (duration) at most. | ||||
|         # XXX XXX XXX | ||||
|         # WHY DID WE EVEN NEED THIS ORIGINALLY!? | ||||
|         # XXX XXX XXX | ||||
|         # - if you query over a gap and get no data | ||||
|         #   that may short circuit the history  | ||||
|         log.info(query_info) | ||||
|         # NOTE XXX: ensure minimum duration in bars? | ||||
|         # => recursively call this method until we get at least as | ||||
|         #   many bars such that they sum in aggregate to the the | ||||
|         #   desired total time (duration) at most. | ||||
|         #  - if you query over a gap and get no data | ||||
|         #    that may short circuit the history | ||||
|         if ( | ||||
|             end_dt | ||||
|             and False | ||||
|             # XXX XXX XXX | ||||
|             # => WHY DID WE EVEN NEED THIS ORIGINALLY!? <= | ||||
|             # XXX XXX XXX | ||||
|             False | ||||
|             and end_dt | ||||
|         ): | ||||
|             nparr: np.ndarray = bars_to_np(bars) | ||||
|             times: np.ndarray = nparr['time'] | ||||
|  | @ -927,7 +957,10 @@ class Client: | |||
|                         warnset = True | ||||
| 
 | ||||
|             else: | ||||
|                 log.info(f'Got first quote for {contract}') | ||||
|                 log.info( | ||||
|                     'Got first quote for contract\n' | ||||
|                     f'{contract}\n' | ||||
|                 ) | ||||
|                 break | ||||
|         else: | ||||
|             if timeouterr and raise_on_timeout: | ||||
|  | @ -991,8 +1024,12 @@ class Client: | |||
|                     outsideRth=True, | ||||
| 
 | ||||
|                     optOutSmartRouting=True, | ||||
|                     # TODO: need to understand this setting better as | ||||
|                     # it pertains to shit ass mms.. | ||||
|                     routeMarketableToBbo=True, | ||||
| 
 | ||||
|                     designatedLocation='SMART', | ||||
| 
 | ||||
|                     # TODO: make all orders GTC? | ||||
|                     # https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba | ||||
|                     # goodTillDate=f"yyyyMMdd-HH:mm:ss", | ||||
|  | @ -1120,8 +1157,8 @@ def get_config() -> dict[str, Any]: | |||
|     names = list(accounts.keys()) | ||||
|     accts = section['accounts'] = bidict(accounts) | ||||
|     log.info( | ||||
|         f'brokers.toml defines {len(accts)} accounts: ' | ||||
|         f'{pformat(names)}' | ||||
|         f'{path} defines {len(accts)} account aliases:\n' | ||||
|         f'{pformat(names)}\n' | ||||
|     ) | ||||
| 
 | ||||
|     if section is None: | ||||
|  | @ -1188,7 +1225,7 @@ async def load_aio_clients( | |||
|         try_ports = list(try_ports.values()) | ||||
| 
 | ||||
|     _err = None | ||||
|     accounts_def = config.load_accounts(['ib']) | ||||
|     accounts_def: dict[str, str] = config.load_accounts(['ib']) | ||||
|     ports = try_ports if port is None else [port] | ||||
|     combos = list(itertools.product(hosts, ports)) | ||||
|     accounts_found: dict[str, Client] = {} | ||||
|  | @ -1227,7 +1264,9 @@ async def load_aio_clients( | |||
|                 client = Client(ib=ib, config=conf) | ||||
| 
 | ||||
|                 # update all actor-global caches | ||||
|                 log.info(f"Caching client for {sockaddr}") | ||||
|                 log.runtime( | ||||
|                     f'Connected and caching `Client` @ {sockaddr!r}' | ||||
|                 ) | ||||
|                 _client_cache[sockaddr] = client | ||||
|                 break | ||||
| 
 | ||||
|  | @ -1242,37 +1281,59 @@ async def load_aio_clients( | |||
|                 OSError, | ||||
|             ) as ce: | ||||
|                 _err = ce | ||||
|                 log.warning( | ||||
|                     f'Failed to connect on {host}:{port} for {i} time with,\n' | ||||
|                     f'{ib.client.apiError.value()}\n' | ||||
|                     'retrying with a new client id..') | ||||
|                 message: str = ( | ||||
|                     f'Failed to connect on {host}:{port} after {i} tries with\n' | ||||
|                     f'{ib.client.apiError.value()!r}\n\n' | ||||
|                     'Retrying with a new client id..\n' | ||||
|                 ) | ||||
|                 log.runtime(message) | ||||
|         else: | ||||
|             # XXX report loudly if we never established after all | ||||
|             # re-tries | ||||
|             log.warning(message) | ||||
| 
 | ||||
|         # Pre-collect all accounts available for this | ||||
|         # connection and map account names to this client | ||||
|         # instance. | ||||
|         for value in ib.accountValues(): | ||||
|             acct_number = value.account | ||||
|             acct_number: str = value.account | ||||
| 
 | ||||
|             entry = accounts_def.inverse.get(acct_number) | ||||
|             if not entry: | ||||
|             acnt_alias: str = accounts_def.inverse.get(acct_number) | ||||
|             if not acnt_alias: | ||||
| 
 | ||||
|                 # TODO: should we constuct the below reco-ex from | ||||
|                 # the existing config content? | ||||
|                 _, path = config.load( | ||||
|                     conf_name='brokers', | ||||
|                 ) | ||||
|                 raise ValueError( | ||||
|                     'No section in brokers.toml for account:' | ||||
|                     f' {acct_number}\n' | ||||
|                     f'Please add entry to continue using this API client' | ||||
|                     'No alias in account section for account!\n' | ||||
|                     f'Please add an acnt alias entry to your {path}\n' | ||||
|                     'For example,\n\n' | ||||
| 
 | ||||
|                     '[ib.accounts]\n' | ||||
|                     'margin = {accnt_number!r}\n' | ||||
|                     '^^^^^^ <- you need this part!\n\n' | ||||
| 
 | ||||
|                     'This ensures `piker` will not leak private acnt info ' | ||||
|                     'to console output by default!\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # surjection of account names to operating clients. | ||||
|             if acct_number not in accounts_found: | ||||
|                 accounts_found[entry] = client | ||||
|             if acnt_alias not in accounts_found: | ||||
|                 accounts_found[acnt_alias] = client | ||||
|                 # client._acnt_names.add(acnt_alias) | ||||
|                 client._acnt_names.append(acnt_alias) | ||||
| 
 | ||||
|         log.info( | ||||
|             f'Loaded accounts for client @ {host}:{port}\n' | ||||
|             f'{pformat(accounts_found)}' | ||||
|         ) | ||||
|         if accounts_found: | ||||
|             log.info( | ||||
|                 f'Loaded accounts for api client\n\n' | ||||
|                 f'{pformat(accounts_found)}\n' | ||||
|             ) | ||||
| 
 | ||||
|         # XXX: why aren't we just updating this directy above | ||||
|         # instead of using the intermediary `accounts_found`? | ||||
|         _accounts2clients.update(accounts_found) | ||||
|             # XXX: why aren't we just updating this directy above | ||||
|             # instead of using the intermediary `accounts_found`? | ||||
|             _accounts2clients.update(accounts_found) | ||||
| 
 | ||||
|     # if we have no clients after the scan loop then error out. | ||||
|     if not _client_cache: | ||||
|  | @ -1472,7 +1533,7 @@ async def open_aio_client_method_relay( | |||
|         msg: tuple[str, dict] | dict | None = await from_trio.get() | ||||
|         match msg: | ||||
|             case None:  # termination sentinel | ||||
|                 print('asyncio PROXY-RELAY SHUTDOWN') | ||||
|                 log.info('asyncio `Client` method-proxy SHUTDOWN!') | ||||
|                 break | ||||
| 
 | ||||
|             case (meth_name, kwargs): | ||||
|  |  | |||
|  | @ -915,9 +915,13 @@ async def stream_quotes( | |||
| 
 | ||||
|         if first_ticker: | ||||
|             first_quote: dict = normalize(first_ticker) | ||||
|             log.info( | ||||
|                 'Rxed init quote:\n' | ||||
|                 f'{pformat(first_quote)}' | ||||
| 
 | ||||
|             # TODO: we need a stack-oriented log levels filters for | ||||
|             # this! | ||||
|             # log.info(message, filter={'stack': 'live_feed'}) ? | ||||
|             log.runtime( | ||||
|                 'Rxed init quote:\n\n' | ||||
|                 f'{pformat(first_quote)}\n' | ||||
|             ) | ||||
| 
 | ||||
|         # NOTE: it might be outside regular trading hours for | ||||
|  | @ -969,7 +973,11 @@ async def stream_quotes( | |||
|             raise_on_timeout=True, | ||||
|         ) | ||||
|         first_quote: dict = normalize(first_ticker) | ||||
|         log.info( | ||||
| 
 | ||||
|         # TODO: we need a stack-oriented log levels filters for | ||||
|         # this! | ||||
|         # log.info(message, filter={'stack': 'live_feed'}) ? | ||||
|         log.runtime( | ||||
|             'Rxed init quote:\n' | ||||
|             f'{pformat(first_quote)}' | ||||
|         ) | ||||
|  |  | |||
|  | @ -209,7 +209,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None: | |||
|                 break | ||||
| 
 | ||||
|             ib_client = proxy._aio_ns.ib | ||||
|             log.info(f'Using {ib_client} for symbol search') | ||||
|             log.info( | ||||
|                 f'Using API client for symbol-search\n' | ||||
|                 f'{ib_client}\n' | ||||
|             ) | ||||
| 
 | ||||
|             last = time.time() | ||||
|             async for pattern in stream: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue