# piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' ``ib`` utilities and hacks suitable for use in the backend and/or as runnable script-programs. ''' from __future__ import annotations from datetime import ( # noqa datetime, date, tzinfo as TzInfo, ) from functools import partial from typing import ( Literal, TYPE_CHECKING, ) import subprocess import tractor from piker.brokers._util import get_logger if TYPE_CHECKING: from .api import Client import i3ipc log = get_logger('piker.brokers.ib') _reset_tech: Literal[ 'vnc', 'i3ipc_xdotool', # TODO: in theory we can use a different linux DE API or # some other type of similar window scanning/mgmt client # (on other OSs) to do the same. ] = 'vnc' no_setup_msg:str = ( 'No data reset hack test setup for {vnc_sockaddr}!\n' 'See config setup tips @\n' 'https://github.com/pikers/piker/tree/master/piker/brokers/ib' ) def try_xdo_manual( client: Client, ): ''' Do the "manual" `xdo`-based screen switch + click combo since apparently the `asyncvnc` client ain't workin.. Note this is only meant as a backup method for Xorg users, ideally you can use a real vnc client and the `vnc_click_hack()` impl! ''' global _reset_tech try: i3ipc_xdotool_manual_click_hack() _reset_tech = 'i3ipc_xdotool' return True except OSError: vnc_sockaddr: str = client.conf.vnc_addrs log.exception( no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) ) return False async def data_reset_hack( client: Client, reset_type: Literal['data', 'connection'], ) -> None: ''' Run key combos for resetting data feeds and yield back to caller when complete. NOTE: this is a linux-only hack around! There are multiple "techs" you can use depending on your infra setup: - if running ib-gw in a container with a VNC server running the most performant method is the `'vnc'` option. - if running ib-gw/tws locally, and you are using `i3` you can use the ``i3ipc`` lib and ``xdotool`` to send the appropriate click and key-combos automatically to your local desktop's java X-apps. https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations TODOs: - a return type that hopefully determines if the hack was successful. - other OS support? - integration with ``ib-gw`` run in docker + Xorg? - is it possible to offer a local server that can be accessed by a client? Would be sure be handy for running native java blobs that need to be wrangle. ''' # look up any user defined vnc socket address mapped from # a particular API socket port. vnc_addrs: tuple[str]|None = client.conf.get('vnc_addrs') if not vnc_addrs: log.warning( no_setup_msg.format(vnc_sockaddr=client.conf) + 'REQUIRES A `vnc_addrs: array` ENTRY' ) global _reset_tech match _reset_tech: case 'vnc': try: await tractor.to_asyncio.run_task( partial( vnc_click_hack, client=client, ) ) except ( OSError, # no VNC server avail.. PermissionError, # asyncvnc pw fail.. ): try: import i3ipc # noqa (since a deps dynamic check) except ModuleNotFoundError: log.warning( no_setup_msg.format(vnc_sockaddr=client.conf) ) return False # XXX, Xorg only workaround.. # TODO? remove now that we have `pyvnc`? # if vnc_host not in { # 'localhost', # '127.0.0.1', # }: # focussed, matches = i3ipc_fin_wins_titled() # if not matches: # log.warning( # no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) # ) # return False # else: # try_xdo_manual(vnc_sockaddr) # localhost but no vnc-client or it borked.. else: try_xdo_manual(client) case 'i3ipc_xdotool': try_xdo_manual(client) # i3ipc_xdotool_manual_click_hack() case _ as tech: raise RuntimeError(f'{tech} is not supported for reset tech!?') # we don't really need the ``xdotool`` approach any more B) return True async def vnc_click_hack( client: Client, reset_type: str = 'data', pw: str|None = None, ) -> None: ''' Reset the data or network connection for the VNC attached ib-gateway using a (magic) keybinding combo. A vnc-server password can be set either by an input `pw` param or set in the client's config with the latter loaded from the user's `brokers.toml` in a vnc-addrs-port-mapping section, .. code:: toml [ib.vnc_addrs] 4002 = {host = 'localhost', port = 5900, pw = 'doggy'} ''' api_port: str = str(client.ib.client.port) conf: dict = client.conf vnc_addrs: dict[int, tuple] = conf.get('vnc_addrs') if not vnc_addrs: return None addr_entry: dict|tuple = vnc_addrs.get( api_port, ('localhost', 5900) # a typical default ) if pw is None: match addr_entry: case ( host, port, ): pass case { 'host': host, 'port': port, 'pw': pw }: pass case _: raise ValueError( f'Invalid `ib.vnc_addrs` entry ?\n' f'{addr_entry!r}\n' ) try: from pyvnc import ( AsyncVNCClient, VNCConfig, Point, MOUSE_BUTTON_LEFT, ) except ModuleNotFoundError: log.warning( "In order to leverage `piker`'s built-in data reset hacks, install " "the `pyvnc` project: https://github.com/regulad/pyvnc.git" ) return # two different hot keys which trigger diff types of reset # requests B) key = { 'data': 'f', 'connection': 'r' }[reset_type] with tractor.devx.open_crash_handler( ignore={TimeoutError,}, ): client = await AsyncVNCClient.connect( VNCConfig( host=host, port=port, password=pw, ) ) async with client: # move to middle of screen # 640x1800 await client.move( Point( 500, 500, ) ) # ensure the ib-gw window is active await client.click(MOUSE_BUTTON_LEFT) # send the hotkeys combo B) await client.press('Ctrl', 'Alt', key) # keys are stacked def i3ipc_fin_wins_titled( titles: list[str] = [ 'Interactive Brokers', # tws running in i3 'IB Gateway', # gw running in i3 # 'IB', # gw running in i3 (newer version?) # !TODO, remote vnc instance # -[ ] something in title (or other Con-props) that indicates # this is explicitly for ibrk sw? # |_[ ] !can use modden spawn eventually! 'TigerVNC', # 'vncviewer', # the terminal.. ], ) -> tuple[ i3ipc.Con, # orig focussed win list[tuple[str, i3ipc.Con]], # matching wins by title ]: ''' Attempt to find a local-DE window titled with an entry in `titles`. If found deliver the current focussed window and all matching `i3ipc.Con`s in a list. ''' import i3ipc ipc = i3ipc.Connection() # TODO: might be worth offering some kinda api for grabbing # the window id from the pid? # https://stackoverflow.com/a/2250879 tree = ipc.get_tree() focussed: i3ipc.Con = tree.find_focused() matches: list[i3ipc.Con] = [] for name in titles: results = tree.find_titled(name) print(f'results for {name}: {results}') if results: con = results[0] matches.append(( name, con, )) return ( focussed, matches, ) def i3ipc_xdotool_manual_click_hack() -> None: ''' Do the data reset hack but expecting a local X-window using `xdotool`. ''' focussed, matches = i3ipc_fin_wins_titled() try: orig_win_id = focussed.window except AttributeError: # XXX if .window cucks we prolly aren't intending to # use this and/or just woke up from suspend.. log.exception('xdotool invalid usage ya ??\n') return try: for name, con in matches: print(f'Resetting data feed for {name}') win_id = str(con.window) w, h = con.rect.width, con.rect.height # TODO: seems to be a few libs for python but not sure # if they support all the sub commands we need, order of # most recent commit history: # https://github.com/rr-/pyxdotool # https://github.com/ShaneHutter/pyxdotool # https://github.com/cphyc/pyxdotool # TODO: only run the reconnect (2nd) kc on a detected # disconnect? for key_combo, timeout in [ # only required if we need a connection reset. # ('ctrl+alt+r', 12), # data feed reset. ('ctrl+alt+f', 6) ]: subprocess.call([ 'xdotool', 'windowactivate', '--sync', win_id, # move mouse to bottom left of window (where # there should be nothing to click). 'mousemove_relative', '--sync', str(w-4), str(h-4), # NOTE: we may need to stick a `--retry 3` in here.. 'click', '--window', win_id, '--repeat', '3', '1', # hackzorzes 'key', key_combo, ], timeout=timeout, ) # re-activate and focus original window subprocess.call([ 'xdotool', 'windowactivate', '--sync', str(orig_win_id), 'click', '--window', str(orig_win_id), '1', ]) except subprocess.TimeoutExpired: log.exception('xdotool timed out?') def is_current_time_in_range( start_dt: datetime, end_dt: datetime, ) -> bool: ''' Check if current time is within the datetime range. Use any/the-same timezone as provided by `start_dt.tzinfo` value in the range. ''' now: datetime = datetime.now(start_dt.tzinfo) return start_dt <= now <= end_dt # TODO, put this into `._util` and call it from here! # # NOTE, this was generated by @guille from a gpt5 prompt # and was originally thot to be needed before learning about # `ib_insync.contract.ContractDetails._parseSessions()` and # it's downstream meths.. # # This is still likely useful to keep for now to parse the # `.tradingHours: str` value manually if we ever decide # to move off `ib_async` and implement our own `trio`/`anyio` # based version Bp # # >attempt to parse the retarted ib "time stampy thing" they # >do for "venue hours" with this.. written by # >gpt5-"thinking", # def parse_trading_hours( spec: str, tz: TzInfo|None = None ) -> dict[ date, tuple[datetime, datetime] ]|None: ''' Parse venue hours like: 'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...' Returns `dict[date] = (open_dt, close_dt)` or `None` if closed. ''' if ( not isinstance(spec, str) or not spec ): raise ValueError('spec must be a non-empty string') out: dict[ date, tuple[datetime, datetime] ]|None = {} for part in (p.strip() for p in spec.split(';') if p.strip()): if part.endswith(':CLOSED'): day_s, _ = part.split(':', 1) d = datetime.strptime(day_s, '%Y%m%d').date() out[d] = None continue try: start_s, end_s = part.split('-', 1) start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M') end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M') except ValueError as exc: raise ValueError(f'invalid segment: {part}') from exc if tz is not None: start_dt = start_dt.replace(tzinfo=tz) end_dt = end_dt.replace(tzinfo=tz) out[start_dt.date()] = (start_dt, end_dt) return out # ORIG desired usage, # # TODO, for non-drunk tomorrow, # - call above fn and check that `output[today] is not None` # trading_hrs: dict = parse_trading_hours( # details.tradingHours # ) # liq_hrs: dict = parse_trading_hours( # details.liquidHours # )