Re-request quote feed on data reset events
When a network outage or data feed connection is reset often the `ib_insync` task will hang until some kind of (internal?) timeout takes place or, in some (worst) cases it never re-establishes (the event stream) and thus the backend needs to restart or the live feed will never resume.. In order to avoid this issue once and for all this patch implements an additional (extremely simple) task that is started with the real-time feed and simply waits for any market data reset events; when detected restarts the `open_aio_quote_stream()` call in a loop using a surrounding cancel scope. Been meaning to implement this for ages and it's finally working!clears_table_events
parent
1d7e642dbd
commit
28535fa977
|
@ -483,7 +483,9 @@ async def _setup_quote_stream(
|
|||
|
||||
to_trio.send_nowait(None)
|
||||
|
||||
async with load_aio_clients() as accts2clients:
|
||||
async with load_aio_clients(
|
||||
disconnect_on_exit=False,
|
||||
) as accts2clients:
|
||||
caccount_name, client = get_preferred_data_client(accts2clients)
|
||||
contract = contract or (await client.find_contract(symbol))
|
||||
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
|
||||
|
@ -563,7 +565,8 @@ async def open_aio_quote_stream(
|
|||
from_aio = _quote_streams.get(symbol)
|
||||
if from_aio:
|
||||
|
||||
# if we already have a cached feed deliver a rx side clone to consumer
|
||||
# if we already have a cached feed deliver a rx side clone
|
||||
# to consumer
|
||||
async with broadcast_receiver(
|
||||
from_aio,
|
||||
2**6,
|
||||
|
@ -754,67 +757,97 @@ async def stream_quotes(
|
|||
await trio.sleep_forever()
|
||||
return # we never expect feed to come up?
|
||||
|
||||
async with open_aio_quote_stream(
|
||||
symbol=sym,
|
||||
contract=con,
|
||||
) as stream:
|
||||
|
||||
# ugh, clear ticks since we've consumed them
|
||||
# (ahem, ib_insync is stateful trash)
|
||||
first_ticker.ticks = []
|
||||
|
||||
task_status.started((init_msgs, first_quote))
|
||||
|
||||
async with aclosing(stream):
|
||||
if syminfo.get('no_vlm', False):
|
||||
|
||||
# generally speaking these feeds don't
|
||||
# include vlm data.
|
||||
atype = syminfo['asset_type']
|
||||
log.info(
|
||||
f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
|
||||
)
|
||||
|
||||
else:
|
||||
# wait for real volume on feed (trading might be closed)
|
||||
while True:
|
||||
ticker = await stream.receive()
|
||||
|
||||
# for a real volume contract we rait for the first
|
||||
# "real" trade to take place
|
||||
if (
|
||||
# not calc_price
|
||||
# and not ticker.rtTime
|
||||
not ticker.rtTime
|
||||
):
|
||||
# spin consuming tickers until we get a real
|
||||
# market datum
|
||||
log.debug(f"New unsent ticker: {ticker}")
|
||||
continue
|
||||
else:
|
||||
log.debug("Received first real volume tick")
|
||||
# ugh, clear ticks since we've consumed them
|
||||
# (ahem, ib_insync is truly stateful trash)
|
||||
ticker.ticks = []
|
||||
|
||||
# XXX: this works because we don't use
|
||||
# ``aclosing()`` above?
|
||||
break
|
||||
|
||||
quote = normalize(ticker)
|
||||
log.debug(f"First ticker received {quote}")
|
||||
|
||||
# tell caller quotes are now coming in live
|
||||
feed_is_live.set()
|
||||
|
||||
# last = time.time()
|
||||
async for ticker in stream:
|
||||
quote = normalize(ticker)
|
||||
await send_chan.send({quote['fqsn']: quote})
|
||||
|
||||
cs: Optional[trio.CancelScope] = None
|
||||
startup: bool = True
|
||||
while (
|
||||
startup
|
||||
or cs.cancel_called
|
||||
):
|
||||
with trio.CancelScope() as cs:
|
||||
async with (
|
||||
trio.open_nursery() as nurse,
|
||||
open_aio_quote_stream(
|
||||
symbol=sym,
|
||||
contract=con,
|
||||
) as stream,
|
||||
):
|
||||
# ugh, clear ticks since we've consumed them
|
||||
ticker.ticks = []
|
||||
# last = time.time()
|
||||
# (ahem, ib_insync is stateful trash)
|
||||
first_ticker.ticks = []
|
||||
|
||||
# only on first entry at feed boot up
|
||||
if startup:
|
||||
startup = False
|
||||
task_status.started((init_msgs, first_quote))
|
||||
|
||||
# start a stream restarter task which monitors the
|
||||
# data feed event.
|
||||
async def reset_on_feed():
|
||||
|
||||
# TODO: this seems to be surpressed from the
|
||||
# traceback in ``tractor``?
|
||||
# assert 0
|
||||
|
||||
rt_ev = proxy.status_event(
|
||||
'Market data farm connection is OK:usfarm'
|
||||
)
|
||||
await rt_ev.wait()
|
||||
cs.cancel() # cancel called should now be set
|
||||
|
||||
nurse.start_soon(reset_on_feed)
|
||||
|
||||
async with aclosing(stream):
|
||||
if syminfo.get('no_vlm', False):
|
||||
|
||||
# generally speaking these feeds don't
|
||||
# include vlm data.
|
||||
atype = syminfo['asset_type']
|
||||
log.info(
|
||||
f'No-vlm {sym}@{atype}, skipping quote poll'
|
||||
)
|
||||
|
||||
else:
|
||||
# wait for real volume on feed (trading might be
|
||||
# closed)
|
||||
while True:
|
||||
ticker = await stream.receive()
|
||||
|
||||
# for a real volume contract we rait for
|
||||
# the first "real" trade to take place
|
||||
if (
|
||||
# not calc_price
|
||||
# and not ticker.rtTime
|
||||
not ticker.rtTime
|
||||
):
|
||||
# spin consuming tickers until we
|
||||
# get a real market datum
|
||||
log.debug(f"New unsent ticker: {ticker}")
|
||||
continue
|
||||
else:
|
||||
log.debug("Received first volume tick")
|
||||
# ugh, clear ticks since we've
|
||||
# consumed them (ahem, ib_insync is
|
||||
# truly stateful trash)
|
||||
ticker.ticks = []
|
||||
|
||||
# XXX: this works because we don't use
|
||||
# ``aclosing()`` above?
|
||||
break
|
||||
|
||||
quote = normalize(ticker)
|
||||
log.debug(f"First ticker received {quote}")
|
||||
|
||||
# tell caller quotes are now coming in live
|
||||
feed_is_live.set()
|
||||
|
||||
# last = time.time()
|
||||
async for ticker in stream:
|
||||
quote = normalize(ticker)
|
||||
await send_chan.send({quote['fqsn']: quote})
|
||||
|
||||
# ugh, clear ticks since we've consumed them
|
||||
ticker.ticks = []
|
||||
# last = time.time()
|
||||
|
||||
|
||||
async def data_reset_hack(
|
||||
|
|
Loading…
Reference in New Issue