From a9d42b374f18075e13de90ad3c808917c1e74453 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 5 Feb 2022 16:48:12 -0500 Subject: [PATCH] ib: Allow history backfilling even when markets are closed --- piker/brokers/ib.py | 95 ++++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 36 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 4dcf7b14..1a1277c6 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -507,6 +507,7 @@ class Client: async def get_quote( self, symbol: str, + ) -> tuple[Contract, Ticker, ContractDetails]: ''' Return a single quote for symbol. @@ -515,10 +516,14 @@ class Client: contract, ticker, details = await self.get_sym_details(symbol) # ensure a last price gets filled in before we deliver quote - for _ in range(25): + for _ in range(2): if isnan(ticker.last): + log.warning(f'Quote for {symbol} timed out: market is closed?') ticker = await ticker.updateEvent - await asyncio.sleep(0.2) + await asyncio.sleep(0.1) + else: + log.info(f'Got first quote for {symbol}') + break else: log.warning( f'Symbol {symbol} is not returning a quote ' @@ -1311,7 +1316,8 @@ async def _setup_quote_stream( # Manually do the dereg ourselves. teardown() except trio.WouldBlock: - log.warning(f'channel is blocking symbol feed for {symbol}?' + log.warning( + f'channel is blocking symbol feed for {symbol}?' f'\n{to_trio.statistics}' ) @@ -1372,12 +1378,13 @@ async def stream_quotes( task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - """Stream symbol quotes. + ''' + Stream symbol quotes. This is a ``trio`` callable routine meant to be invoked once the brokerd is up. - """ + ''' # TODO: support multiple subscriptions sym = symbols[0] @@ -1418,8 +1425,6 @@ async def stream_quotes( return init_msgs if cs.cancelled_caught: - # it might be outside regular trading hours so see if we can at - # least grab history. contract, first_ticker, details = await _trio_run_client_method( method='get_sym_details', @@ -1435,42 +1440,60 @@ async def stream_quotes( symbol=sym, ) + # else: + init_msgs = mk_init_msgs() + + con = first_ticker.contract + + # should be real volume for this contract by default + calc_price = False + + # check for special contract types + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + + suffix = con.primaryExchange + if not suffix: + suffix = con.exchange + else: - init_msgs = mk_init_msgs() + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = con.secType + # no real volume on this tract + calc_price = True + quote = normalize(first_ticker, calc_price=calc_price) + con = quote['contract'] + topic = '.'.join((con['symbol'], suffix)).lower() + quote['symbol'] = topic + + # pass first quote asap + first_quote = {topic: quote} + + if isnan(first_ticker.last): + # it might be outside regular trading hours so see if we can at + # least grab history. + # quote = normalize(first_ticker, calc_price=calc_price) + # con = quote['contract'] + # topic = '.'.join((con['symbol'], suffix)).lower() + # quote['symbol'] = topic + + # # pass first quote asap + # first_quote = {topic: quote} + task_status.started((init_msgs, first_quote)) + + # it's not really live but this will unblock + # the brokerd feed task to tell the ui to update? + feed_is_live.set() + + # block and let data history backfill code run. + await trio.sleep_forever() + return # we never expect feed to come up? - # stream = await start_aio_quote_stream(symbol=sym, contract=contract) async with open_aio_quote_stream( symbol=sym, contract=contract ) as stream: - con = first_ticker.contract - - # should be real volume for this contract by default - calc_price = False - - # check for special contract types - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - - suffix = con.primaryExchange - if not suffix: - suffix = con.exchange - - else: - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = con.secType - # no real volume on this tract - calc_price = True - - quote = normalize(first_ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], suffix)).lower() - quote['symbol'] = topic - - # pass first quote asap - first_quote = {topic: quote} - # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) first_ticker.ticks = []