ib: add venue-hours checking

Such that we can avoid other (pretty unreliable) "alternative" checks to
determine whether a real-time quote should be waited on or (when venue
is closed) we should just signal that historical backfilling can
commence immediately.

This has been a todo for a very long time and it turned out to be much
easier to accomplish than anticipated..

Deats,
- add a new `is_current_time_in_range()` dt range checker to predicate
  whether an input range contains `datetime.now(start_dt.tzinfo)`.
- in `.ib.feed.stream_quotes()` add a `venue_is_open: bool` which uses
  all of the new ^^ to determine whether to branch for the
  short-circuit-and-do-history-now-case or the std real-time-quotes
  should-be-awaited-since-venue-is-open, case; drop all the old hacks
  trying to workaround not figuring that venue state stuff..

Other,
- also add a gpt5 composed parser to `._util` for the
  `ib_insync.ContractDetails.tradingHours: str` for before i realized
  there was a `.tradingSessions` property XD
- in `.ib_feed`,
  * add various EG-collapsings per recent tractor/trio updates.
  * better logging / exc-handling around ticker quote pushes.
  * stop clearing `Ticker.ticks` each quote iteration; not sure if this
    is needed/correct tho?
  * add masked `Ticker.ticks` poll loop that logs.
- fix some `str.format()` usage in `._util.try_xdo_manual()`
testing_utils
Tyler Goodlet 2025-09-20 22:13:59 -04:00
parent 390a57c96d
commit e19a724037
2 changed files with 233 additions and 64 deletions

View File

@ -20,6 +20,11 @@ runnable script-programs.
'''
from __future__ import annotations
from datetime import ( # noqa
datetime,
date,
tzinfo as TzInfo,
)
from functools import partial
from typing import (
Literal,
@ -75,7 +80,7 @@ def try_xdo_manual(
return True
except OSError:
log.exception(
no_setup_msg.format(vnc_sockaddr)
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
)
return False
@ -124,7 +129,7 @@ async def data_reset_hack(
if not vnc_sockaddr:
log.warning(
no_setup_msg.format(vnc_sockaddr)
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
+
'REQUIRES A `vnc_addrs: array` ENTRY'
)
@ -153,7 +158,7 @@ async def data_reset_hack(
import i3ipc # noqa (since a deps dynamic check)
except ModuleNotFoundError:
log.warning(
no_setup_msg.format(vnc_sockaddr)
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
)
return False
@ -164,7 +169,7 @@ async def data_reset_hack(
focussed, matches = i3ipc_fin_wins_titled()
if not matches:
log.warning(
no_setup_msg.format(vnc_sockaddr)
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
)
return False
else:
@ -337,3 +342,99 @@ def i3ipc_xdotool_manual_click_hack() -> None:
])
except subprocess.TimeoutExpired:
log.exception('xdotool timed out?')
def is_current_time_in_range(
start_dt: datetime,
end_dt: datetime,
) -> bool:
'''
Check if current time is within the datetime range.
Use any/the-same timezone as provided by `start_dt.tzinfo` value
in the range.
'''
now: datetime = datetime.now(start_dt.tzinfo)
return start_dt <= now <= end_dt
# TODO, put this into `._util` and call it from here!
#
# NOTE, this was generated by @guille from a gpt5 prompt
# and was originally thot to be needed before learning about
# `ib_insync.contract.ContractDetails._parseSessions()` and
# it's downstream meths..
#
# This is still likely useful to keep for now to parse the
# `.tradingHours: str` value manually if we ever decide
# to move off `ib_async` and implement our own `trio`/`anyio`
# based version Bp
#
# >attempt to parse the retarted ib "time stampy thing" they
# >do for "venue hours" with this.. written by
# >gpt5-"thinking",
#
def parse_trading_hours(
spec: str,
tz: TzInfo|None = None
) -> dict[
date,
tuple[datetime, datetime]
]|None:
'''
Parse venue hours like:
'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...'
Returns `dict[date] = (open_dt, close_dt)` or `None` if
closed.
'''
if (
not isinstance(spec, str)
or
not spec
):
raise ValueError('spec must be a non-empty string')
out: dict[
date,
tuple[datetime, datetime]
]|None = {}
for part in (p.strip() for p in spec.split(';') if p.strip()):
if part.endswith(':CLOSED'):
day_s, _ = part.split(':', 1)
d = datetime.strptime(day_s, '%Y%m%d').date()
out[d] = None
continue
try:
start_s, end_s = part.split('-', 1)
start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M')
end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M')
except ValueError as exc:
raise ValueError(f'invalid segment: {part}') from exc
if tz is not None:
start_dt = start_dt.replace(tzinfo=tz)
end_dt = end_dt.replace(tzinfo=tz)
out[start_dt.date()] = (start_dt, end_dt)
return out
# ORIG desired usage,
#
# TODO, for non-drunk tomorrow,
# - call above fn and check that `output[today] is not None`
# trading_hrs: dict = parse_trading_hours(
# details.tradingHours
# )
# liq_hrs: dict = parse_trading_hours(
# details.liquidHours
# )

View File

@ -26,7 +26,6 @@ from dataclasses import asdict
from datetime import datetime
from functools import partial
from pprint import pformat
from math import isnan
import time
from typing import (
Any,
@ -69,7 +68,10 @@ from .api import (
Contract,
RequestError,
)
from ._util import data_reset_hack
from ._util import (
data_reset_hack,
is_current_time_in_range,
)
from .symbols import get_mkt_info
if TYPE_CHECKING:
@ -184,7 +186,8 @@ async def open_history_client(
if (
start_dt
and start_dt.timestamp() == 0
and
start_dt.timestamp() == 0
):
await tractor.pause()
@ -203,7 +206,7 @@ async def open_history_client(
):
count += 1
mean += latency / count
print(
log.debug(
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
f'mean: {mean}'
)
@ -607,7 +610,10 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
# start history request that we allow
# to run indefinitely until a result is acquired
@ -689,10 +695,17 @@ async def _setup_quote_stream(
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
# since asyncio.Task
# tractor.pause_from_sync()
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
to_trio.send_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
ticker: Ticker = client.ib.reqMktData(
contract,
','.join(opts),
)
# NOTE: it's batch-wise and slow af but I guess could
# be good for backchecking? Seems to be every 5s maybe?
@ -716,10 +729,10 @@ async def _setup_quote_stream(
Push quotes to trio task.
"""
# log.debug(t)
# log.debug(f'new IB quote: {t}\n')
try:
to_trio.send_nowait(t)
except (
trio.BrokenResourceError,
@ -734,21 +747,47 @@ async def _setup_quote_stream(
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
teardown()
except trio.WouldBlock:
# log.warning(
# f'channel is blocking symbol feed for {symbol}?'
# f'\n{to_trio.statistics}'
# )
pass
# except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt
# # with log msgs
# pass
# for slow debugging purposes to avoid clobbering prompt
# with log msgs
except trio.WouldBlock:
log.exception(
f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n'
f'\n'
f'{to_trio.statistics()}\n'
)
# ?TODO, handle re-connection attempts?
except BaseException as _berr:
berr = _berr
log.exception(
f'Failed to push ticker quote !?\n'
f'cause: {berr}\n'
f'\n'
f't: {t}\n'
f'{to_trio.statistics}\n'
)
# raise berr
ticker.updateEvent.connect(push)
try:
await asyncio.sleep(float('inf'))
# XXX, just for debug..
# tractor.pause_from_sync()
# while True:
# await asyncio.sleep(1.6)
# if ticker.ticks:
# log.debug(
# f'ticker.ticks = \n'
# f'{ticker.ticks}\n'
# )
# else:
# log.warning(
# 'UHH no ticker.ticks ??'
# )
finally:
teardown()
@ -820,7 +859,7 @@ def normalize(
tbt = ticker.tickByTicks
if tbt:
print(f'tickbyticks:\n {ticker.tickByTicks}')
log.info(f'tickbyticks:\n {ticker.tickByTicks}')
ticker.ticks = new_ticks
@ -861,22 +900,28 @@ async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# TODO? we need to hook into the `ib_async` logger like
# we can with i3ipc from modden!
# loglevel: str|None = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Stream symbol quotes.
Stream `symbols[0]` quotes back via `send_chan`.
This is a ``trio`` callable routine meant to be invoked
once the brokerd is up.
The `feed_is_live: Event` is set to signal the caller that it can
begin processing msgs from the mem-chan.
'''
# TODO: support multiple subscriptions
sym = symbols[0]
log.info(f'request for real-time quotes: {sym}')
sym: str = symbols[0]
log.info(
f'request for real-time quotes\n'
f'sym: {sym!r}\n'
)
init_msgs: list[FeedInit] = []
@ -885,34 +930,49 @@ async def stream_quotes(
details: ibis.ContractDetails
async with (
open_data_client() as proxy,
# trio.open_nursery() as tn,
):
mkt, details = await get_mkt_info(
sym,
proxy=proxy, # passed to avoid implicit client load
)
# is venue active rn?
venue_is_open: bool = any(
is_current_time_in_range(
start_dt=sesh.start,
end_dt=sesh.end,
)
for sesh in details.tradingSessions()
)
init_msg = FeedInit(mkt_info=mkt)
# NOTE, tell sampler (via config) to skip vlm summing for dst
# assets which provide no vlm data..
if mkt.dst.atype in {
'fiat',
'index',
'commodity',
}:
# tell sampler config that it shouldn't do vlm summing.
init_msg.shm_write_opts['sum_tick_vlm'] = False
init_msg.shm_write_opts['has_vlm'] = False
init_msgs.append(init_msg)
con: Contract = details.contract
first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker|None = None
with trio.move_on_after(1.6) as quote_cs:
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
# XXX should never happen with this ep right?
# but if so then, more then likely mkt is closed?
if quote_cs.cancelled_caught:
await tractor.pause()
if first_ticker:
first_quote: dict = normalize(first_ticker)
@ -924,28 +984,27 @@ async def stream_quotes(
f'{pformat(first_quote)}\n'
)
# NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
# type is NOT within the NON-NORMAL-venue set: aka not
# commodities, forex or crypto currencies which CAN
# always return a NaN on a snap quote request during
# normal venue hours. In the case of a closed venue
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if (
first_ticker
and
isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
# dst asset venue with a lot of closed operating hours.
and mkt.dst.atype not in {
'commodity',
'fiat',
'crypto',
}
):
# XXX NOTE: whenever we're "outside regular trading hours"
# (only relevant for assets coming from the "legacy markets"
# space) so we basically (from an API/runtime-operational
# perspective) "pretend the feed is live" even if it's
# actually closed.
#
# IOW, we signal to the effective caller (task) that the live
# feed is "already up" but really we're just indicating that
# the OHLCV history can start being loaded immediately by the
# `piker.data`/`.tsp` layers.
#
# XXX, deats: the "pretend we're live" is just done by
# a `feed_is_live.set()` even though nothing is actually live
# Bp
if not venue_is_open:
log.warning(
f'Venue is closed, unable to establish real-time feed.\n'
f'mkt: {mkt!r}\n'
f'\n'
f'first_ticker: {first_ticker}\n'
)
task_status.started((
init_msgs,
first_quote,
@ -956,10 +1015,12 @@ async def stream_quotes(
feed_is_live.set()
# block and let data history backfill code run.
# XXX obvi given the venue is closed, we never expect feed
# to come up; a taskc should be the only way to
# terminate this task.
await trio.sleep_forever()
return # we never expect feed to come up?
# TODO: we should instead spawn a task that waits on a feed
# ?TODO, we could instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
@ -981,23 +1042,26 @@ async def stream_quotes(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
cs: trio.CancelScope | None = None
cs: trio.CancelScope|None = None
startup: bool = True
while (
startup
or cs.cancel_called
or
cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream,
):
# ?TODO? can we rm this - particularly for `ib_async`?
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
# first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
@ -1011,8 +1075,8 @@ async def stream_quotes(
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# ??TODO? this seems to be surpressed from the
# traceback in `tractor`?
# assert 0
rt_ev = proxy.status_event(
@ -1056,7 +1120,7 @@ async def stream_quotes(
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
ticker.ticks = []
# ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
@ -1073,8 +1137,12 @@ async def stream_quotes(
async for ticker in stream:
quote = normalize(ticker)
fqme = quote['fqme']
log.debug(
f'Sending quote\n'
f'{quote}'
)
await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# ticker.ticks = []
# last = time.time()