ib: support remote host vnc client connections

I figure we might as well support multiple types of distributed
multi-host setups; why not allow running the API (gateway) and thus vnc
server on a diff host and allowing clients to connect and do their thing
B)

Deatz:
- make `ib._util.data_reset_hack()` take in a `vnc_host` which gets
  proxied through to the `asyncvnc` client.
- pull `ib_insync.client.Client` host value and pass-through to data
  reset machinery, presuming the vnc server is running in the same
  container (and/or the same host).
- if no vnc connection **and** no i3ipc trick can be used, just report
  to the user that they need to remove the data throttle manually.
- fix `feed.get_bars()` to handle throttle cases the same based on error
  msg matching, not error the code and add a max `_failed_resets` count
  to trigger bailing on the query loop.
master
Tyler Goodlet 2023-05-12 09:41:45 -04:00
parent 7ff8aa1ba0
commit 957224bdc5
2 changed files with 103 additions and 60 deletions

View File

@ -32,7 +32,10 @@ import tractor
from .._util import log from .._util import log
if TYPE_CHECKING: if TYPE_CHECKING:
from .api import MethodProxy from .api import (
MethodProxy,
ib_Client
)
_reset_tech: Literal[ _reset_tech: Literal[
@ -47,9 +50,8 @@ _reset_tech: Literal[
async def data_reset_hack( async def data_reset_hack(
proxy: MethodProxy, vnc_host: str,
reset_type: str = 'data', reset_type: Literal['data', 'connection'],
**kwargs,
) -> None: ) -> None:
''' '''
@ -79,9 +81,13 @@ async def data_reset_hack(
that need to be wrangle. that need to be wrangle.
''' '''
global _reset_tech
client: 'IBCLIENTTHING' = proxy._aio_ns.ib.client no_setup_msg:str = (
'No data reset hack test setup for {vnc_host}!\n'
'See setup @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
global _reset_tech
match _reset_tech: match _reset_tech:
case 'vnc': case 'vnc':
@ -89,15 +95,26 @@ async def data_reset_hack(
await tractor.to_asyncio.run_task( await tractor.to_asyncio.run_task(
partial( partial(
vnc_click_hack, vnc_click_hack,
host=client.host, host=vnc_host,
) )
) )
except OSError: except OSError:
_reset_tech = 'i3ipc_xdotool' if vnc_host != 'localhost':
log.warning(no_setup_msg)
return False
try:
import i3ipc
except ModuleNotFoundError:
log.warning(no_setup_msg)
return False
try: try:
i3ipc_xdotool_manual_click_hack() i3ipc_xdotool_manual_click_hack()
_reset_tech = 'i3ipc_xdotool'
return True return True
except OSError: except OSError:
log.exception(no_setup_msg)
return False return False
case 'i3ipc_xdotool': case 'i3ipc_xdotool':
@ -119,9 +136,21 @@ async def vnc_click_hack(
ib gateway using magic combos. ib gateway using magic combos.
''' '''
key = {'data': 'f', 'connection': 'r'}[reset_type] try:
import asyncvnc
except ModuleNotFoundError:
log.warning(
"In order to leverage `piker`'s built-in data reset hacks, install "
"the `asyncvnc` project: https://github.com/barneygale/asyncvnc"
)
return
import asyncvnc # two different hot keys which trigger diff types of reset
# requests B)
key = {
'data': 'f',
'connection': 'r'
}[reset_type]
async with asyncvnc.connect( async with asyncvnc.connect(
host, host,
@ -140,8 +169,6 @@ async def vnc_click_hack(
def i3ipc_xdotool_manual_click_hack() -> None: def i3ipc_xdotool_manual_click_hack() -> None:
import i3ipc
i3 = i3ipc.Connection() i3 = i3ipc.Connection()
t = i3.get_tree() t = i3.get_tree()

View File

@ -212,7 +212,7 @@ _pacing: str = (
async def wait_on_data_reset( async def wait_on_data_reset(
proxy: MethodProxy, proxy: MethodProxy,
reset_type: str = 'data', reset_type: str = 'data',
timeout: float = 16, timeout: float = float('inf'),
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
@ -228,7 +228,7 @@ async def wait_on_data_reset(
'HMDS data farm connection is OK:ushmds' 'HMDS data farm connection is OK:ushmds'
) )
# XXX: other event messages we might want to try and # TODO: other event messages we might want to try and
# wait for but i wasn't able to get any of this # wait for but i wasn't able to get any of this
# reliable.. # reliable..
# reconnect_start = proxy.status_event( # reconnect_start = proxy.status_event(
@ -239,14 +239,21 @@ async def wait_on_data_reset(
# ) # )
# try to wait on the reset event(s) to arrive, a timeout # try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now). # will trigger a retry up to 6 times (for now).
client = proxy._aio_ns.ib.client
done = trio.Event() done = trio.Event()
with trio.move_on_after(timeout) as cs: with trio.move_on_after(timeout) as cs:
task_status.started((cs, done)) task_status.started((cs, done))
log.warning('Sending DATA RESET request') log.warning(
res = await data_reset_hack(reset_type=reset_type) 'Sending DATA RESET request:\n'
f'{client}'
)
res = await data_reset_hack(
vnc_host=client.host,
reset_type=reset_type,
)
if not res: if not res:
log.warning( log.warning(
@ -280,7 +287,7 @@ async def wait_on_data_reset(
_data_resetter_task: trio.Task | None = None _data_resetter_task: trio.Task | None = None
_failed_resets: int = 0
async def get_bars( async def get_bars(
@ -299,6 +306,7 @@ async def get_bars(
# history queries for instrument, presuming that most don't # history queries for instrument, presuming that most don't
# not trade for a week XD # not trade for a week XD
max_nodatas: int = 6, max_nodatas: int = 6,
max_failed_resets: int = 6,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -308,7 +316,7 @@ async def get_bars(
a ``MethoProxy``. a ``MethoProxy``.
''' '''
global _data_resetter_task global _data_resetter_task, _failed_resets
nodatas_count: int = 0 nodatas_count: int = 0
data_cs: trio.CancelScope | None = None data_cs: trio.CancelScope | None = None
@ -321,8 +329,11 @@ async def get_bars(
result_ready = trio.Event() result_ready = trio.Event()
async def query(): async def query():
global _failed_resets
nonlocal result, data_cs, end_dt, nodatas_count nonlocal result, data_cs, end_dt, nodatas_count
while True:
while _failed_resets < max_failed_resets:
try: try:
out = await proxy.bars( out = await proxy.bars(
fqsn=fqsn, fqsn=fqsn,
@ -382,49 +393,48 @@ async def get_bars(
f'Symbol: {fqsn}', f'Symbol: {fqsn}',
) )
elif err.code == 162: elif (
if ( 'HMDS query returned no data' in msg
'HMDS query returned no data' in msg ):
): # XXX: this is now done in the storage mgmt
# XXX: this is now done in the storage mgmt # layer and we shouldn't implicitly decrement
# layer and we shouldn't implicitly decrement # the frame dt index since the upper layer may
# the frame dt index since the upper layer may # be doing so concurrently and we don't want to
# be doing so concurrently and we don't want to # be delivering frames that weren't asked for.
# be delivering frames that weren't asked for. # try to decrement start point and look further back
# try to decrement start point and look further back # end_dt = end_dt.subtract(seconds=2000)
# end_dt = end_dt.subtract(seconds=2000) logmsg = "SUBTRACTING DAY from DT index"
logmsg = "SUBTRACTING DAY from DT index" if end_dt is not None:
if end_dt is not None: end_dt = end_dt.subtract(days=1)
end_dt = end_dt.subtract(days=1) elif end_dt is None:
elif end_dt is None: end_dt = pendulum.now().subtract(days=1)
end_dt = pendulum.now().subtract(days=1)
log.warning( log.warning(
f'NO DATA found ending @ {end_dt}\n' f'NO DATA found ending @ {end_dt}\n'
+ logmsg + logmsg
)
if nodatas_count >= max_nodatas:
raise DataUnavailable(
f'Presuming {fqsn} has no further history '
f'after {max_nodatas} tries..'
) )
if nodatas_count >= max_nodatas: nodatas_count += 1
raise DataUnavailable( continue
f'Presuming {fqsn} has no further history '
f'after {max_nodatas} tries..'
)
nodatas_count += 1 elif 'API historical data query cancelled' in err.message:
continue log.warning(
'Query cancelled by IB (:eyeroll:):\n'
elif 'API historical data query cancelled' in err.message: f'{err.message}'
log.warning( )
'Query cancelled by IB (:eyeroll:):\n' continue
f'{err.message}' elif (
) 'Trading TWS session is connected from a different IP'
continue in err.message
elif ( ):
'Trading TWS session is connected from a different IP' log.warning("ignoring ip address warning")
in err.message continue
):
log.warning("ignoring ip address warning")
continue
# XXX: more or less same as above timeout case # XXX: more or less same as above timeout case
elif _pacing in msg: elif _pacing in msg:
@ -433,8 +443,11 @@ async def get_bars(
'Resetting farms with `ctrl-alt-f` hack\n' 'Resetting farms with `ctrl-alt-f` hack\n'
) )
client = proxy._aio_ns.ib.client
# cancel any existing reset task # cancel any existing reset task
if data_cs: if data_cs:
log.cancel(f'Cancelling existing reset for {client}')
data_cs.cancel() data_cs.cancel()
# spawn new data reset task # spawn new data reset task
@ -442,10 +455,13 @@ async def get_bars(
partial( partial(
wait_on_data_reset, wait_on_data_reset,
proxy, proxy,
timeout=float('inf'),
reset_type='connection' reset_type='connection'
) )
) )
if reset_done:
_failed_resets = 0
else:
_failed_resets += 1
continue continue
else: else:
@ -482,7 +498,7 @@ async def get_bars(
partial( partial(
wait_on_data_reset, wait_on_data_reset,
proxy, proxy,
timeout=float('inf'), reset_type='data',
) )
) )
# sync wait on reset to complete # sync wait on reset to complete