ib: Allow history backfilling even when markets are closed

ib_mkt_closed
Tyler Goodlet 2022-02-05 16:48:12 -05:00
parent 43b39d3b6b
commit a9d42b374f
1 changed files with 59 additions and 36 deletions

View File

@ -507,6 +507,7 @@ class Client:
async def get_quote( async def get_quote(
self, self,
symbol: str, symbol: str,
) -> tuple[Contract, Ticker, ContractDetails]: ) -> tuple[Contract, Ticker, ContractDetails]:
''' '''
Return a single quote for symbol. Return a single quote for symbol.
@ -515,10 +516,14 @@ class Client:
contract, ticker, details = await self.get_sym_details(symbol) contract, ticker, details = await self.get_sym_details(symbol)
# ensure a last price gets filled in before we deliver quote # ensure a last price gets filled in before we deliver quote
for _ in range(25): for _ in range(2):
if isnan(ticker.last): if isnan(ticker.last):
log.warning(f'Quote for {symbol} timed out: market is closed?')
ticker = await ticker.updateEvent ticker = await ticker.updateEvent
await asyncio.sleep(0.2) await asyncio.sleep(0.1)
else:
log.info(f'Got first quote for {symbol}')
break
else: else:
log.warning( log.warning(
f'Symbol {symbol} is not returning a quote ' f'Symbol {symbol} is not returning a quote '
@ -1311,7 +1316,8 @@ async def _setup_quote_stream(
# Manually do the dereg ourselves. # Manually do the dereg ourselves.
teardown() teardown()
except trio.WouldBlock: except trio.WouldBlock:
log.warning(f'channel is blocking symbol feed for {symbol}?' log.warning(
f'channel is blocking symbol feed for {symbol}?'
f'\n{to_trio.statistics}' f'\n{to_trio.statistics}'
) )
@ -1372,12 +1378,13 @@ async def stream_quotes(
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Stream symbol quotes. '''
Stream symbol quotes.
This is a ``trio`` callable routine meant to be invoked This is a ``trio`` callable routine meant to be invoked
once the brokerd is up. once the brokerd is up.
"""
'''
# TODO: support multiple subscriptions # TODO: support multiple subscriptions
sym = symbols[0] sym = symbols[0]
@ -1418,8 +1425,6 @@ async def stream_quotes(
return init_msgs return init_msgs
if cs.cancelled_caught: if cs.cancelled_caught:
# it might be outside regular trading hours so see if we can at
# least grab history.
contract, first_ticker, details = await _trio_run_client_method( contract, first_ticker, details = await _trio_run_client_method(
method='get_sym_details', method='get_sym_details',
@ -1435,42 +1440,60 @@ async def stream_quotes(
symbol=sym, symbol=sym,
) )
# else:
init_msgs = mk_init_msgs()
con = first_ticker.contract
# should be real volume for this contract by default
calc_price = False
# check for special contract types
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = con.primaryExchange
if not suffix:
suffix = con.exchange
else: else:
init_msgs = mk_init_msgs() # commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price
suffix = con.secType
# no real volume on this tract
calc_price = True
quote = normalize(first_ticker, calc_price=calc_price)
con = quote['contract']
topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic
# pass first quote asap
first_quote = {topic: quote}
if isnan(first_ticker.last):
# it might be outside regular trading hours so see if we can at
# least grab history.
# quote = normalize(first_ticker, calc_price=calc_price)
# con = quote['contract']
# topic = '.'.join((con['symbol'], suffix)).lower()
# quote['symbol'] = topic
# # pass first quote asap
# first_quote = {topic: quote}
task_status.started((init_msgs, first_quote))
# it's not really live but this will unblock
# the brokerd feed task to tell the ui to update?
feed_is_live.set()
# block and let data history backfill code run.
await trio.sleep_forever()
return # we never expect feed to come up?
# stream = await start_aio_quote_stream(symbol=sym, contract=contract)
async with open_aio_quote_stream( async with open_aio_quote_stream(
symbol=sym, contract=contract symbol=sym, contract=contract
) as stream: ) as stream:
con = first_ticker.contract
# should be real volume for this contract by default
calc_price = False
# check for special contract types
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = con.primaryExchange
if not suffix:
suffix = con.exchange
else:
# commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price
suffix = con.secType
# no real volume on this tract
calc_price = True
quote = normalize(first_ticker, calc_price=calc_price)
con = quote['contract']
topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic
# pass first quote asap
first_quote = {topic: quote}
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []