diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 512e5358..2941284a 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -20,6 +20,11 @@ runnable script-programs. ''' from __future__ import annotations +from datetime import ( # noqa + datetime, + date, + tzinfo as TzInfo, +) from functools import partial from typing import ( Literal, @@ -75,7 +80,7 @@ def try_xdo_manual( return True except OSError: log.exception( - no_setup_msg.format(vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) ) return False @@ -124,7 +129,7 @@ async def data_reset_hack( if not vnc_sockaddr: log.warning( - no_setup_msg.format(vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) + 'REQUIRES A `vnc_addrs: array` ENTRY' ) @@ -153,7 +158,7 @@ async def data_reset_hack( import i3ipc # noqa (since a deps dynamic check) except ModuleNotFoundError: log.warning( - no_setup_msg.format(vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) ) return False @@ -164,7 +169,7 @@ async def data_reset_hack( focussed, matches = i3ipc_fin_wins_titled() if not matches: log.warning( - no_setup_msg.format(vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) ) return False else: @@ -337,3 +342,99 @@ def i3ipc_xdotool_manual_click_hack() -> None: ]) except subprocess.TimeoutExpired: log.exception('xdotool timed out?') + + + +def is_current_time_in_range( + start_dt: datetime, + end_dt: datetime, +) -> bool: + ''' + Check if current time is within the datetime range. + + Use any/the-same timezone as provided by `start_dt.tzinfo` value + in the range. + + ''' + now: datetime = datetime.now(start_dt.tzinfo) + return start_dt <= now <= end_dt + + +# TODO, put this into `._util` and call it from here! +# +# NOTE, this was generated by @guille from a gpt5 prompt +# and was originally thot to be needed before learning about +# `ib_insync.contract.ContractDetails._parseSessions()` and +# it's downstream meths.. +# +# This is still likely useful to keep for now to parse the +# `.tradingHours: str` value manually if we ever decide +# to move off `ib_async` and implement our own `trio`/`anyio` +# based version Bp +# +# >attempt to parse the retarted ib "time stampy thing" they +# >do for "venue hours" with this.. written by +# >gpt5-"thinking", +# + + +def parse_trading_hours( + spec: str, + tz: TzInfo|None = None +) -> dict[ + date, + tuple[datetime, datetime] +]|None: + ''' + Parse venue hours like: + 'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...' + + Returns `dict[date] = (open_dt, close_dt)` or `None` if + closed. + + ''' + if ( + not isinstance(spec, str) + or + not spec + ): + raise ValueError('spec must be a non-empty string') + + out: dict[ + date, + tuple[datetime, datetime] + ]|None = {} + + for part in (p.strip() for p in spec.split(';') if p.strip()): + if part.endswith(':CLOSED'): + day_s, _ = part.split(':', 1) + d = datetime.strptime(day_s, '%Y%m%d').date() + out[d] = None + continue + + try: + start_s, end_s = part.split('-', 1) + start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M') + end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M') + except ValueError as exc: + raise ValueError(f'invalid segment: {part}') from exc + + if tz is not None: + start_dt = start_dt.replace(tzinfo=tz) + end_dt = end_dt.replace(tzinfo=tz) + + out[start_dt.date()] = (start_dt, end_dt) + + return out + + +# ORIG desired usage, +# +# TODO, for non-drunk tomorrow, +# - call above fn and check that `output[today] is not None` +# trading_hrs: dict = parse_trading_hours( +# details.tradingHours +# ) +# liq_hrs: dict = parse_trading_hours( +# details.liquidHours + # ) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 2c1a9224..90002fae 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -26,7 +26,6 @@ from dataclasses import asdict from datetime import datetime from functools import partial from pprint import pformat -from math import isnan import time from typing import ( Any, @@ -69,7 +68,10 @@ from .api import ( Contract, RequestError, ) -from ._util import data_reset_hack +from ._util import ( + data_reset_hack, + is_current_time_in_range, +) from .symbols import get_mkt_info if TYPE_CHECKING: @@ -184,7 +186,8 @@ async def open_history_client( if ( start_dt - and start_dt.timestamp() == 0 + and + start_dt.timestamp() == 0 ): await tractor.pause() @@ -203,7 +206,7 @@ async def open_history_client( ): count += 1 mean += latency / count - print( + log.debug( f'HISTORY FRAME QUERY LATENCY: {latency}\n' f'mean: {mean}' ) @@ -607,7 +610,10 @@ async def get_bars( # such that simultaneous symbol queries don't try data resettingn # too fast.. unset_resetter: bool = False - async with trio.open_nursery() as nurse: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as nurse + ): # start history request that we allow # to run indefinitely until a result is acquired @@ -689,10 +695,17 @@ async def _setup_quote_stream( async with load_aio_clients( disconnect_on_exit=False, ) as accts2clients: + + # since asyncio.Task + # tractor.pause_from_sync() + caccount_name, client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) to_trio.send_nowait(contract) # cuz why not - ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + ticker: Ticker = client.ib.reqMktData( + contract, + ','.join(opts), + ) # NOTE: it's batch-wise and slow af but I guess could # be good for backchecking? Seems to be every 5s maybe? @@ -716,10 +729,10 @@ async def _setup_quote_stream( Push quotes to trio task. """ - # log.debug(t) + + # log.debug(f'new IB quote: {t}\n') try: to_trio.send_nowait(t) - except ( trio.BrokenResourceError, @@ -734,21 +747,47 @@ async def _setup_quote_stream( # resulting in tracebacks spammed to console.. # Manually do the dereg ourselves. teardown() - except trio.WouldBlock: - # log.warning( - # f'channel is blocking symbol feed for {symbol}?' - # f'\n{to_trio.statistics}' - # ) - pass - # except trio.WouldBlock: - # # for slow debugging purposes to avoid clobbering prompt - # # with log msgs - # pass + # for slow debugging purposes to avoid clobbering prompt + # with log msgs + except trio.WouldBlock: + log.exception( + f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n' + f'\n' + f'{to_trio.statistics()}\n' + ) + + # ?TODO, handle re-connection attempts? + except BaseException as _berr: + berr = _berr + log.exception( + f'Failed to push ticker quote !?\n' + f'cause: {berr}\n' + f'\n' + f't: {t}\n' + f'{to_trio.statistics}\n' + ) + # raise berr + ticker.updateEvent.connect(push) try: await asyncio.sleep(float('inf')) + + # XXX, just for debug.. + # tractor.pause_from_sync() + # while True: + # await asyncio.sleep(1.6) + # if ticker.ticks: + # log.debug( + # f'ticker.ticks = \n' + # f'{ticker.ticks}\n' + # ) + # else: + # log.warning( + # 'UHH no ticker.ticks ??' + # ) + finally: teardown() @@ -820,7 +859,7 @@ def normalize( tbt = ticker.tickByTicks if tbt: - print(f'tickbyticks:\n {ticker.tickByTicks}') + log.info(f'tickbyticks:\n {ticker.tickByTicks}') ticker.ticks = new_ticks @@ -861,22 +900,28 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, - loglevel: str = None, + + # TODO? we need to hook into the `ib_async` logger like + # we can with i3ipc from modden! + # loglevel: str|None = None, # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: ''' - Stream symbol quotes. + Stream `symbols[0]` quotes back via `send_chan`. - This is a ``trio`` callable routine meant to be invoked - once the brokerd is up. + The `feed_is_live: Event` is set to signal the caller that it can + begin processing msgs from the mem-chan. ''' # TODO: support multiple subscriptions - sym = symbols[0] - log.info(f'request for real-time quotes: {sym}') + sym: str = symbols[0] + log.info( + f'request for real-time quotes\n' + f'sym: {sym!r}\n' + ) init_msgs: list[FeedInit] = [] @@ -885,34 +930,49 @@ async def stream_quotes( details: ibis.ContractDetails async with ( open_data_client() as proxy, - # trio.open_nursery() as tn, ): mkt, details = await get_mkt_info( sym, proxy=proxy, # passed to avoid implicit client load ) + # is venue active rn? + venue_is_open: bool = any( + is_current_time_in_range( + start_dt=sesh.start, + end_dt=sesh.end, + ) + for sesh in details.tradingSessions() + ) + init_msg = FeedInit(mkt_info=mkt) + # NOTE, tell sampler (via config) to skip vlm summing for dst + # assets which provide no vlm data.. if mkt.dst.atype in { 'fiat', 'index', 'commodity', }: - # tell sampler config that it shouldn't do vlm summing. init_msg.shm_write_opts['sum_tick_vlm'] = False init_msg.shm_write_opts['has_vlm'] = False init_msgs.append(init_msg) con: Contract = details.contract - first_ticker: Ticker | None = None - with trio.move_on_after(1): + first_ticker: Ticker|None = None + + with trio.move_on_after(1.6) as quote_cs: first_ticker: Ticker = await proxy.get_quote( contract=con, raise_on_timeout=False, ) + # XXX should never happen with this ep right? + # but if so then, more then likely mkt is closed? + if quote_cs.cancelled_caught: + await tractor.pause() + if first_ticker: first_quote: dict = normalize(first_ticker) @@ -924,28 +984,27 @@ async def stream_quotes( f'{pformat(first_quote)}\n' ) - # NOTE: it might be outside regular trading hours for - # assets with "standard venue operating hours" so we - # only "pretend the feed is live" when the dst asset - # type is NOT within the NON-NORMAL-venue set: aka not - # commodities, forex or crypto currencies which CAN - # always return a NaN on a snap quote request during - # normal venue hours. In the case of a closed venue - # (equitiies, futes, bonds etc.) we at least try to - # grab the OHLC history. - if ( - first_ticker - and - isnan(first_ticker.last) - # SO, if the last quote price value is NaN we ONLY - # "pretend to do" `feed_is_live.set()` if it's a known - # dst asset venue with a lot of closed operating hours. - and mkt.dst.atype not in { - 'commodity', - 'fiat', - 'crypto', - } - ): + # XXX NOTE: whenever we're "outside regular trading hours" + # (only relevant for assets coming from the "legacy markets" + # space) so we basically (from an API/runtime-operational + # perspective) "pretend the feed is live" even if it's + # actually closed. + # + # IOW, we signal to the effective caller (task) that the live + # feed is "already up" but really we're just indicating that + # the OHLCV history can start being loaded immediately by the + # `piker.data`/`.tsp` layers. + # + # XXX, deats: the "pretend we're live" is just done by + # a `feed_is_live.set()` even though nothing is actually live + # Bp + if not venue_is_open: + log.warning( + f'Venue is closed, unable to establish real-time feed.\n' + f'mkt: {mkt!r}\n' + f'\n' + f'first_ticker: {first_ticker}\n' + ) task_status.started(( init_msgs, first_quote, @@ -956,10 +1015,12 @@ async def stream_quotes( feed_is_live.set() # block and let data history backfill code run. + # XXX obvi given the venue is closed, we never expect feed + # to come up; a taskc should be the only way to + # terminate this task. await trio.sleep_forever() - return # we never expect feed to come up? - # TODO: we should instead spawn a task that waits on a feed + # ?TODO, we could instead spawn a task that waits on a feed # to start and let it wait indefinitely..instead of this # hard coded stuff. # async def wait_for_first_quote(): @@ -981,23 +1042,26 @@ async def stream_quotes( 'Rxed init quote:\n' f'{pformat(first_quote)}' ) - cs: trio.CancelScope | None = None + cs: trio.CancelScope|None = None startup: bool = True while ( startup - or cs.cancel_called + or + cs.cancel_called ): with trio.CancelScope() as cs: async with ( + tractor.trionics.collapse_eg(), trio.open_nursery() as nurse, open_aio_quote_stream( symbol=sym, contract=con, ) as stream, ): + # ?TODO? can we rm this - particularly for `ib_async`? # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] + # first_ticker.ticks = [] # only on first entry at feed boot up if startup: @@ -1011,8 +1075,8 @@ async def stream_quotes( # data feed event. async def reset_on_feed(): - # TODO: this seems to be surpressed from the - # traceback in ``tractor``? + # ??TODO? this seems to be surpressed from the + # traceback in `tractor`? # assert 0 rt_ev = proxy.status_event( @@ -1056,7 +1120,7 @@ async def stream_quotes( # ugh, clear ticks since we've # consumed them (ahem, ib_insync is # truly stateful trash) - ticker.ticks = [] + # ticker.ticks = [] # XXX: this works because we don't use # ``aclosing()`` above? @@ -1073,8 +1137,12 @@ async def stream_quotes( async for ticker in stream: quote = normalize(ticker) fqme = quote['fqme'] + log.debug( + f'Sending quote\n' + f'{quote}' + ) await send_chan.send({fqme: quote}) # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # ticker.ticks = [] # last = time.time()