From 957224bdc5981945405bf69a780dc9f7f37fed6a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 12 May 2023 09:41:45 -0400 Subject: [PATCH] ib: support remote host vnc client connections I figure we might as well support multiple types of distributed multi-host setups; why not allow running the API (gateway) and thus vnc server on a diff host and allowing clients to connect and do their thing B) Deatz: - make `ib._util.data_reset_hack()` take in a `vnc_host` which gets proxied through to the `asyncvnc` client. - pull `ib_insync.client.Client` host value and pass-through to data reset machinery, presuming the vnc server is running in the same container (and/or the same host). - if no vnc connection **and** no i3ipc trick can be used, just report to the user that they need to remove the data throttle manually. - fix `feed.get_bars()` to handle throttle cases the same based on error msg matching, not error the code and add a max `_failed_resets` count to trigger bailing on the query loop. --- piker/brokers/ib/_util.py | 51 +++++++++++++---- piker/brokers/ib/feed.py | 112 ++++++++++++++++++++++---------------- 2 files changed, 103 insertions(+), 60 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 114022fa..4c3bbb34 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -32,7 +32,10 @@ import tractor from .._util import log if TYPE_CHECKING: - from .api import MethodProxy + from .api import ( + MethodProxy, + ib_Client + ) _reset_tech: Literal[ @@ -47,9 +50,8 @@ _reset_tech: Literal[ async def data_reset_hack( - proxy: MethodProxy, - reset_type: str = 'data', - **kwargs, + vnc_host: str, + reset_type: Literal['data', 'connection'], ) -> None: ''' @@ -79,9 +81,13 @@ async def data_reset_hack( that need to be wrangle. ''' - global _reset_tech - client: 'IBCLIENTTHING' = proxy._aio_ns.ib.client + no_setup_msg:str = ( + 'No data reset hack test setup for {vnc_host}!\n' + 'See setup @\n' + 'https://github.com/pikers/piker/tree/master/piker/brokers/ib' + ) + global _reset_tech match _reset_tech: case 'vnc': @@ -89,15 +95,26 @@ async def data_reset_hack( await tractor.to_asyncio.run_task( partial( vnc_click_hack, - host=client.host, + host=vnc_host, ) ) except OSError: - _reset_tech = 'i3ipc_xdotool' + if vnc_host != 'localhost': + log.warning(no_setup_msg) + return False + + try: + import i3ipc + except ModuleNotFoundError: + log.warning(no_setup_msg) + return False + try: i3ipc_xdotool_manual_click_hack() + _reset_tech = 'i3ipc_xdotool' return True except OSError: + log.exception(no_setup_msg) return False case 'i3ipc_xdotool': @@ -119,9 +136,21 @@ async def vnc_click_hack( ib gateway using magic combos. ''' - key = {'data': 'f', 'connection': 'r'}[reset_type] + try: + import asyncvnc + except ModuleNotFoundError: + log.warning( + "In order to leverage `piker`'s built-in data reset hacks, install " + "the `asyncvnc` project: https://github.com/barneygale/asyncvnc" + ) + return - import asyncvnc + # two different hot keys which trigger diff types of reset + # requests B) + key = { + 'data': 'f', + 'connection': 'r' + }[reset_type] async with asyncvnc.connect( host, @@ -140,8 +169,6 @@ async def vnc_click_hack( def i3ipc_xdotool_manual_click_hack() -> None: - import i3ipc - i3 = i3ipc.Connection() t = i3.get_tree() diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index df1eea6a..61288a3a 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -212,7 +212,7 @@ _pacing: str = ( async def wait_on_data_reset( proxy: MethodProxy, reset_type: str = 'data', - timeout: float = 16, + timeout: float = float('inf'), task_status: TaskStatus[ tuple[ @@ -228,7 +228,7 @@ async def wait_on_data_reset( 'HMDS data farm connection is OK:ushmds' ) - # XXX: other event messages we might want to try and + # TODO: other event messages we might want to try and # wait for but i wasn't able to get any of this # reliable.. # reconnect_start = proxy.status_event( @@ -239,14 +239,21 @@ async def wait_on_data_reset( # ) # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). + client = proxy._aio_ns.ib.client done = trio.Event() with trio.move_on_after(timeout) as cs: task_status.started((cs, done)) - log.warning('Sending DATA RESET request') - res = await data_reset_hack(reset_type=reset_type) + log.warning( + 'Sending DATA RESET request:\n' + f'{client}' + ) + res = await data_reset_hack( + vnc_host=client.host, + reset_type=reset_type, + ) if not res: log.warning( @@ -280,7 +287,7 @@ async def wait_on_data_reset( _data_resetter_task: trio.Task | None = None - +_failed_resets: int = 0 async def get_bars( @@ -299,6 +306,7 @@ async def get_bars( # history queries for instrument, presuming that most don't # not trade for a week XD max_nodatas: int = 6, + max_failed_resets: int = 6, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -308,7 +316,7 @@ async def get_bars( a ``MethoProxy``. ''' - global _data_resetter_task + global _data_resetter_task, _failed_resets nodatas_count: int = 0 data_cs: trio.CancelScope | None = None @@ -321,8 +329,11 @@ async def get_bars( result_ready = trio.Event() async def query(): + + global _failed_resets nonlocal result, data_cs, end_dt, nodatas_count - while True: + + while _failed_resets < max_failed_resets: try: out = await proxy.bars( fqsn=fqsn, @@ -382,49 +393,48 @@ async def get_bars( f'Symbol: {fqsn}', ) - elif err.code == 162: - if ( - 'HMDS query returned no data' in msg - ): - # XXX: this is now done in the storage mgmt - # layer and we shouldn't implicitly decrement - # the frame dt index since the upper layer may - # be doing so concurrently and we don't want to - # be delivering frames that weren't asked for. - # try to decrement start point and look further back - # end_dt = end_dt.subtract(seconds=2000) - logmsg = "SUBTRACTING DAY from DT index" - if end_dt is not None: - end_dt = end_dt.subtract(days=1) - elif end_dt is None: - end_dt = pendulum.now().subtract(days=1) + elif ( + 'HMDS query returned no data' in msg + ): + # XXX: this is now done in the storage mgmt + # layer and we shouldn't implicitly decrement + # the frame dt index since the upper layer may + # be doing so concurrently and we don't want to + # be delivering frames that weren't asked for. + # try to decrement start point and look further back + # end_dt = end_dt.subtract(seconds=2000) + logmsg = "SUBTRACTING DAY from DT index" + if end_dt is not None: + end_dt = end_dt.subtract(days=1) + elif end_dt is None: + end_dt = pendulum.now().subtract(days=1) - log.warning( - f'NO DATA found ending @ {end_dt}\n' - + logmsg + log.warning( + f'NO DATA found ending @ {end_dt}\n' + + logmsg + ) + + if nodatas_count >= max_nodatas: + raise DataUnavailable( + f'Presuming {fqsn} has no further history ' + f'after {max_nodatas} tries..' ) - if nodatas_count >= max_nodatas: - raise DataUnavailable( - f'Presuming {fqsn} has no further history ' - f'after {max_nodatas} tries..' - ) + nodatas_count += 1 + continue - nodatas_count += 1 - continue - - elif 'API historical data query cancelled' in err.message: - log.warning( - 'Query cancelled by IB (:eyeroll:):\n' - f'{err.message}' - ) - continue - elif ( - 'Trading TWS session is connected from a different IP' - in err.message - ): - log.warning("ignoring ip address warning") - continue + elif 'API historical data query cancelled' in err.message: + log.warning( + 'Query cancelled by IB (:eyeroll:):\n' + f'{err.message}' + ) + continue + elif ( + 'Trading TWS session is connected from a different IP' + in err.message + ): + log.warning("ignoring ip address warning") + continue # XXX: more or less same as above timeout case elif _pacing in msg: @@ -433,8 +443,11 @@ async def get_bars( 'Resetting farms with `ctrl-alt-f` hack\n' ) + client = proxy._aio_ns.ib.client + # cancel any existing reset task if data_cs: + log.cancel(f'Cancelling existing reset for {client}') data_cs.cancel() # spawn new data reset task @@ -442,10 +455,13 @@ async def get_bars( partial( wait_on_data_reset, proxy, - timeout=float('inf'), reset_type='connection' ) ) + if reset_done: + _failed_resets = 0 + else: + _failed_resets += 1 continue else: @@ -482,7 +498,7 @@ async def get_bars( partial( wait_on_data_reset, proxy, - timeout=float('inf'), + reset_type='data', ) ) # sync wait on reset to complete