Get ib data feed hackzorz workin

ib has a throttle limit for "hft" bars but contained in here is some
hackery using ``xdotool`` to reset data farms auto-magically B)

This copies the working script into the ib backend mod as a routine and
now uses `trio.run_process()` and calls into it from the `get_bars()`
history retriever and then waits for "data re-established" events to be
received from the client before making more history queries.

TL;DR summary of changes:
- relay ib's "system status" events (like for data farm statuses)
  as a new "event" msg that can be processed by registers of
  `Client.inline_errors()` (though we should probably make a new
  method for this).
- add `MethodProxy.status_event()` which allows a proxy user to register
  for a particular "system event" (as mentioned above), which puts
  a `trio.Event` entry in a small table can be set by an relay task if
  there are any detected waiters.
- start a "msg relay task" when opening the method proxy which does
  the event setting mentioned above in the background.
- drop the request error handling around the proxy creation, doesn't
  seem necessary any more now that we have better error propagation from
  `asyncio`.
- add event waiting logic around the data feed reset hackzorin.
- change the order relay task to only log system events for now (though
  we need to do some better parsing/logic to get tws-external order
  updates to work again..
broker_bumpz
Tyler Goodlet 2022-03-30 13:49:19 -04:00
parent 874374af06
commit b579d4b1f5
1 changed files with 255 additions and 78 deletions

View File

@ -294,7 +294,8 @@ class Client:
bars_kwargs = {'whatToShow': 'TRADES'}
global _enters
print(f'ENTER BARS {_enters} @ end={end_dt}')
# log.info(f'REQUESTING BARS {_enters} @ end={end_dt}')
print(f'REQUESTING BARS {_enters} @ end={end_dt}')
_enters += 1
contract = await self.find_contract(fqsn)
@ -304,6 +305,7 @@ class Client:
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime=end_dt,
formatDate=2,
# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
@ -738,25 +740,38 @@ class Client:
def inline_errors(
self,
to_trio: trio.abc.SendChannel,
) -> None:
# connect error msgs
) -> None:
'''
Setup error relay to the provided ``trio`` mem chan such that
trio tasks can retreive and parse ``asyncio``-side API request
errors.
'''
def push_err(
reqId: int,
errorCode: int,
errorString: str,
contract: Contract,
) -> None:
log.error(errorString)
reason = errorString
if reqId == -1:
# it's a general event?
key = 'event'
else:
key = 'error'
try:
to_trio.send_nowait((
'error',
key,
# error "object"
{'reqid': reqId,
'reason': errorString,
'reason': reason,
'contract': contract}
))
except trio.BrokenResourceError:
@ -1123,9 +1138,11 @@ class MethodProxy:
def __init__(
self,
chan: to_asyncio.LinkedTaskChannel,
event_table: dict[str, trio.Event],
) -> None:
self.chan = chan
self.event_table = event_table
async def _run_method(
self,
@ -1140,19 +1157,44 @@ class MethodProxy:
'''
chan = self.chan
# send through method + ``kwargs: dict`` as pair
await chan.send((meth, kwargs))
while not chan.closed():
# send through method + ``kwargs: dict`` as pair
msg = await chan.receive()
# print(f'NEXT MSG: {msg}')
# TODO: py3.10 ``match:`` syntax B)
if 'result' in msg:
res = msg.get('result')
if res:
return res
err = msg.get('error')
if not err:
raise ValueError(f'Received unexpected asyncio msg {msg}')
elif 'exception' in msg:
err = msg.get('exception')
raise err
elif 'error' in msg:
etype, emsg = msg
log.warning(f'IB error relay: {emsg}')
continue
else:
log.warning(f'UNKNOWN IB MSG: {msg}')
def status_event(
self,
pattern: str,
) -> Union[dict[str, Any], trio.Event]:
ev = self.event_table.get(pattern)
if not ev or ev.is_set():
# print(f'inserting new data reset event item')
ev = self.event_table[pattern] = trio.Event()
return ev
async def wait_for_data_reset(self) -> None:
'''
Send hacker hot keys to ib program and wait
@ -1166,6 +1208,7 @@ class MethodProxy:
async def open_aio_client_method_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
event_consumers: dict[str, trio.Event],
) -> None:
@ -1177,7 +1220,7 @@ async def open_aio_client_method_relay(
to_trio.send_nowait(client)
# TODO: separate channel for error handling?
# client.inline_errors(to_trio)
client.inline_errors(to_trio)
# relay all method requests to ``asyncio``-side client and
# deliver back results
@ -1188,8 +1231,8 @@ async def open_aio_client_method_relay(
break
meth_name, kwargs = msg
meth = getattr(client, meth_name)
try:
resp = await meth(**kwargs)
# echo the msg back
@ -1201,19 +1244,25 @@ async def open_aio_client_method_relay(
# TODO: relay all errors to trio?
# BaseException,
) as err:
to_trio.send_nowait({'error': err})
to_trio.send_nowait({'exception': err})
@acm
async def open_client_proxy() -> MethodProxy:
try:
async with to_asyncio.open_channel_from(
# try:
event_table = {}
async with (
to_asyncio.open_channel_from(
open_aio_client_method_relay,
) as (first, chan):
event_consumers=event_table,
) as (first, chan),
trio.open_nursery() as relay_n,
):
assert isinstance(first, Client)
proxy = MethodProxy(chan)
proxy = MethodProxy(chan, event_table)
# mock all remote methods on ib ``Client``.
for name, method in inspect.getmembers(
@ -1223,33 +1272,34 @@ async def open_client_proxy() -> MethodProxy:
continue
setattr(proxy, name, partial(proxy._run_method, meth=name))
async def relay_events():
async with chan.subscribe() as msg_stream:
async for msg in msg_stream:
if 'event' not in msg:
continue
# if 'event' in msg:
# wake up any system event waiters.
etype, status_msg = msg
reason = status_msg['reason']
ev = proxy.event_table.pop(reason, None)
if ev and ev.statistics().tasks_waiting:
log.info(f'Relaying ib status message: {msg}')
ev.set()
continue
relay_n.start_soon(relay_events)
yield proxy
# terminate asyncio side task
await chan.send(None)
except (
RequestError,
# BaseException,
)as err:
code = getattr(err, 'code', None)
if code:
msg = err.message
# TODO: retreive underlying ``ib_insync`` error?
if (
code == 162 and (
'HMDS query returned no data' in msg
or 'No market data permissions for' in msg
)
or code == 200
):
# these cases should not cause a task crash
log.warning(msg)
else:
raise
@acm
async def get_client(
@ -1378,6 +1428,12 @@ def normalize(
return data
_pacing: str = (
'Historical Market Data Service error '
'message:Historical data request pacing violation'
)
async def get_bars(
proxy: MethodProxy,
@ -1396,14 +1452,13 @@ async def get_bars(
fails = 0
bars: Optional[list] = None
in_throttle: bool = False
first_dt: datetime = None
last_dt: datetime = None
if end_dt:
last_dt = pendulum.from_timestamp(end_dt.timestamp())
for _ in range(10):
for _ in range(2):
try:
bars, bars_array = await proxy.bars(
fqsn=fqsn,
@ -1449,26 +1504,43 @@ async def get_bars(
continue
else:
log.exception(
"Data query rate reached: Press `ctrl-alt-f`"
"in TWS"
elif _pacing in msg:
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
'HMDS data farm connection is OK:ushmds'
)
# live_ev = proxy.status_event(
# # 'Market data farm connection is OK:usfuture'
# 'Market data farm connection is OK:usfarm'
# )
# TODO: some kinda resp here that indicates success
# otherwise retry?
await data_reset_hack()
# TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here
# that restarts/resumes this task?
if not in_throttle:
await tractor.breakpoint()
# TODO: a while loop here if we timeout?
for name, ev in [
('history', hist_ev),
# ('live', live_ev),
]:
with trio.move_on_after(22) as cs:
await ev.wait()
log.info(f"{name} DATA RESET")
# TODO: wait on data con reset event
# then begin backfilling again.
# await proxy.wait_for_data()
if cs.cancelled_caught:
log.warning("reset hack failed on first try?")
# await tractor.breakpoint()
in_throttle = True
fails += 1
continue
else:
raise
return None, None
# else: # throttle wasn't fixed so error out immediately
@ -1520,8 +1592,7 @@ async def backfill_bars(
# on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys
# memory.
# count: int = 120,
count: int = 36,
count: int = 65,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -1566,9 +1637,6 @@ async def backfill_bars(
out, fails = await get_bars(proxy, fqsn, end_dt=first_dt)
if fails is None or fails > 1:
break
if out == (None, None):
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and
@ -2222,18 +2290,26 @@ async def deliver_trade_events(
msg = pack_position(item)
msg.account = accounts_def.inverse[msg.account]
if getattr(msg, 'reqid', 0) < -1:
elif event_name == 'event':
# it's a trade event generated by TWS usage.
log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# it's either a general system status event or an external
# trade event?
log.info(f"TWS system status: \n{pformat(item)}")
msg.reqid = 'tws-' + str(-1 * msg.reqid)
# TODO: support this again but needs parsing at the callback
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
continue
# msg.reqid = 'tws-' + str(-1 * reqid)
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
msg.broker_details['external_src'] = 'tws'
continue
# msg.broker_details['external_src'] = 'tws'
# XXX: we always serialize to a dict for msgpack
# translations, ideally we can move to an msgspec (or other)
@ -2336,3 +2412,104 @@ async def open_symbol_search(
log.debug(f"sending matches: {matches.keys()}")
await stream.send(matches)
async def data_reset_hack(
reset_type: str = 'data',
) -> None:
'''
Run key combos for resetting data feeds and yield back to caller
when complete.
This is a linux-only hack around:
https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations
TODOs:
- a return type that hopefully determines if the hack was
successful.
- other OS support?
- integration with ``ib-gw`` run in docker + Xorg?
'''
# TODO: try out this lib instead, seems to be the most modern
# and usess the underlying lib:
# https://github.com/rshk/python-libxdo
# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool
try:
import i3ipc
except ImportError:
return False
log.warning('IB data hack no-supported on ur platformz')
i3 = i3ipc.Connection()
t = i3.get_tree()
orig_win_id = t.find_focused().window
# for tws
win_names: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
]
combos: dict[str, str] = {
# only required if we need a connection reset.
'connection': ('ctrl+alt+r', 12),
# data feed reset.
'data': ('ctrl+alt+f', 6)
}
for name in win_names:
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
key_combo, timeout = combos[reset_type]
# for key_combo, timeout in [
# # only required if we need a connection reset.
# # ('ctrl+alt+r', 12),
# # data feed reset.
# ('ctrl+alt+f', 6)
# ]:
await trio.run_process([
'xdotool',
'windowactivate', '--sync', win_id,
# move mouse to bottom left of window (where there should
# be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',
# hackzorzes
'key', key_combo,
# ],
# timeout=timeout,
])
# re-activate and focus original window
await trio.run_process([
'xdotool',
'windowactivate', '--sync', str(orig_win_id),
'click', '--window', str(orig_win_id), '1',
])
return True