commit
cef2cdd6b6
|
@ -507,6 +507,7 @@ class Client:
|
||||||
async def get_quote(
|
async def get_quote(
|
||||||
self,
|
self,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
|
|
||||||
) -> tuple[Contract, Ticker, ContractDetails]:
|
) -> tuple[Contract, Ticker, ContractDetails]:
|
||||||
'''
|
'''
|
||||||
Return a single quote for symbol.
|
Return a single quote for symbol.
|
||||||
|
@ -515,10 +516,14 @@ class Client:
|
||||||
contract, ticker, details = await self.get_sym_details(symbol)
|
contract, ticker, details = await self.get_sym_details(symbol)
|
||||||
|
|
||||||
# ensure a last price gets filled in before we deliver quote
|
# ensure a last price gets filled in before we deliver quote
|
||||||
for _ in range(25):
|
for _ in range(2):
|
||||||
if isnan(ticker.last):
|
if isnan(ticker.last):
|
||||||
|
log.warning(f'Quote for {symbol} timed out: market is closed?')
|
||||||
ticker = await ticker.updateEvent
|
ticker = await ticker.updateEvent
|
||||||
await asyncio.sleep(0.2)
|
await asyncio.sleep(0.1)
|
||||||
|
else:
|
||||||
|
log.info(f'Got first quote for {symbol}')
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Symbol {symbol} is not returning a quote '
|
f'Symbol {symbol} is not returning a quote '
|
||||||
|
@ -1311,7 +1316,8 @@ async def _setup_quote_stream(
|
||||||
# Manually do the dereg ourselves.
|
# Manually do the dereg ourselves.
|
||||||
teardown()
|
teardown()
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
log.warning(f'channel is blocking symbol feed for {symbol}?'
|
log.warning(
|
||||||
|
f'channel is blocking symbol feed for {symbol}?'
|
||||||
f'\n{to_trio.statistics}'
|
f'\n{to_trio.statistics}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1372,16 +1378,17 @@ async def stream_quotes(
|
||||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Stream symbol quotes.
|
'''
|
||||||
|
Stream symbol quotes.
|
||||||
|
|
||||||
This is a ``trio`` callable routine meant to be invoked
|
This is a ``trio`` callable routine meant to be invoked
|
||||||
once the brokerd is up.
|
once the brokerd is up.
|
||||||
"""
|
|
||||||
|
|
||||||
|
'''
|
||||||
# TODO: support multiple subscriptions
|
# TODO: support multiple subscriptions
|
||||||
sym = symbols[0]
|
sym = symbols[0]
|
||||||
|
|
||||||
with trio.fail_after(16) as cs:
|
with trio.fail_after(16):
|
||||||
contract, first_ticker, details = await _trio_run_client_method(
|
contract, first_ticker, details = await _trio_run_client_method(
|
||||||
method='get_quote',
|
method='get_quote',
|
||||||
symbol=sym,
|
symbol=sym,
|
||||||
|
@ -1417,60 +1424,51 @@ async def stream_quotes(
|
||||||
}
|
}
|
||||||
return init_msgs
|
return init_msgs
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
init_msgs = mk_init_msgs()
|
||||||
# it might be outside regular trading hours so see if we can at
|
con = first_ticker.contract
|
||||||
# least grab history.
|
|
||||||
|
|
||||||
contract, first_ticker, details = await _trio_run_client_method(
|
# should be real volume for this contract by default
|
||||||
method='get_sym_details',
|
calc_price = False
|
||||||
symbol=sym,
|
|
||||||
)
|
|
||||||
|
|
||||||
# init_msgs = mk_init_msgs()
|
# check for special contract types
|
||||||
|
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
|
||||||
|
|
||||||
# try again but without timeout and then do feed startup once we
|
suffix = con.primaryExchange
|
||||||
# get one.
|
if not suffix:
|
||||||
contract, first_ticker, details = await _trio_run_client_method(
|
suffix = con.exchange
|
||||||
method='get_quote',
|
|
||||||
symbol=sym,
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
init_msgs = mk_init_msgs()
|
# commodities and forex don't have an exchange name and
|
||||||
|
# no real volume so we have to calculate the price
|
||||||
|
suffix = con.secType
|
||||||
|
# no real volume on this tract
|
||||||
|
calc_price = True
|
||||||
|
|
||||||
|
quote = normalize(first_ticker, calc_price=calc_price)
|
||||||
|
con = quote['contract']
|
||||||
|
topic = '.'.join((con['symbol'], suffix)).lower()
|
||||||
|
quote['symbol'] = topic
|
||||||
|
|
||||||
|
# pass first quote asap
|
||||||
|
first_quote = {topic: quote}
|
||||||
|
|
||||||
|
# it might be outside regular trading hours so see if we can at
|
||||||
|
# least grab history.
|
||||||
|
if isnan(first_ticker.last):
|
||||||
|
task_status.started((init_msgs, first_quote))
|
||||||
|
|
||||||
|
# it's not really live but this will unblock
|
||||||
|
# the brokerd feed task to tell the ui to update?
|
||||||
|
feed_is_live.set()
|
||||||
|
|
||||||
|
# block and let data history backfill code run.
|
||||||
|
await trio.sleep_forever()
|
||||||
|
return # we never expect feed to come up?
|
||||||
|
|
||||||
# stream = await start_aio_quote_stream(symbol=sym, contract=contract)
|
|
||||||
async with open_aio_quote_stream(
|
async with open_aio_quote_stream(
|
||||||
symbol=sym, contract=contract
|
symbol=sym, contract=contract
|
||||||
) as stream:
|
) as stream:
|
||||||
|
|
||||||
con = first_ticker.contract
|
|
||||||
|
|
||||||
# should be real volume for this contract by default
|
|
||||||
calc_price = False
|
|
||||||
|
|
||||||
# check for special contract types
|
|
||||||
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
|
|
||||||
|
|
||||||
suffix = con.primaryExchange
|
|
||||||
if not suffix:
|
|
||||||
suffix = con.exchange
|
|
||||||
|
|
||||||
else:
|
|
||||||
# commodities and forex don't have an exchange name and
|
|
||||||
# no real volume so we have to calculate the price
|
|
||||||
suffix = con.secType
|
|
||||||
# no real volume on this tract
|
|
||||||
calc_price = True
|
|
||||||
|
|
||||||
quote = normalize(first_ticker, calc_price=calc_price)
|
|
||||||
con = quote['contract']
|
|
||||||
topic = '.'.join((con['symbol'], suffix)).lower()
|
|
||||||
quote['symbol'] = topic
|
|
||||||
|
|
||||||
# pass first quote asap
|
|
||||||
first_quote = {topic: quote}
|
|
||||||
|
|
||||||
# ugh, clear ticks since we've consumed them
|
# ugh, clear ticks since we've consumed them
|
||||||
# (ahem, ib_insync is stateful trash)
|
# (ahem, ib_insync is stateful trash)
|
||||||
first_ticker.ticks = []
|
first_ticker.ticks = []
|
||||||
|
|
|
@ -1020,7 +1020,11 @@ async def _emsd_main(
|
||||||
|
|
||||||
book = _router.get_dark_book(broker)
|
book = _router.get_dark_book(broker)
|
||||||
last = book.lasts[(broker, symbol)] = first_quote['last']
|
last = book.lasts[(broker, symbol)] = first_quote['last']
|
||||||
assert not isnan(last) # ib is a cucker but we've fixed it in the backend
|
|
||||||
|
# XXX: ib is a cucker but we've fixed avoiding receiving any
|
||||||
|
# `Nan`s in the backend during market hours (right?). this was
|
||||||
|
# here previously as a sanity check during market hours.
|
||||||
|
# assert not isnan(last)
|
||||||
|
|
||||||
# open a stream with the brokerd backend for order
|
# open a stream with the brokerd backend for order
|
||||||
# flow dialogue
|
# flow dialogue
|
||||||
|
|
Loading…
Reference in New Issue