Re-request quote feed on data reset events

When a network outage or data feed connection is reset often the
`ib_insync` task will hang until some kind of (internal?) timeout takes
place or, in some (worst) cases it never re-establishes (the event
stream) and thus the backend needs to restart or the live feed will
never resume..

In order to avoid this issue once and for all this patch implements an
additional (extremely simple) task that is started with the  real-time
feed and simply waits for any market data reset events; when detected
restarts the `open_aio_quote_stream()` call in a loop using
a surrounding cancel scope.

Been meaning to implement this for ages and it's finally working!
ib_1m_hist
Tyler Goodlet 2022-09-29 17:20:27 -04:00
parent 90a395a069
commit daebb78755
1 changed files with 95 additions and 62 deletions

View File

@ -483,7 +483,9 @@ async def _setup_quote_stream(
to_trio.send_nowait(None) to_trio.send_nowait(None)
async with load_aio_clients() as accts2clients: async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients) caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -563,7 +565,8 @@ async def open_aio_quote_stream(
from_aio = _quote_streams.get(symbol) from_aio = _quote_streams.get(symbol)
if from_aio: if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer # if we already have a cached feed deliver a rx side clone
# to consumer
async with broadcast_receiver( async with broadcast_receiver(
from_aio, from_aio,
2**6, 2**6,
@ -754,17 +757,45 @@ async def stream_quotes(
await trio.sleep_forever() await trio.sleep_forever()
return # we never expect feed to come up? return # we never expect feed to come up?
async with open_aio_quote_stream( cs: Optional[trio.CancelScope] = None
startup: bool = True
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym, symbol=sym,
contract=con, contract=con,
) as stream: ) as stream,
):
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
startup = False
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
# start a stream restarter task which monitors the
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
'Market data farm connection is OK:usfarm'
)
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
async with aclosing(stream): async with aclosing(stream):
if syminfo.get('no_vlm', False): if syminfo.get('no_vlm', False):
@ -772,29 +803,31 @@ async def stream_quotes(
# include vlm data. # include vlm data.
atype = syminfo['asset_type'] atype = syminfo['asset_type']
log.info( log.info(
f'Non-vlm asset {sym}@{atype}, skipping quote poll...' f'No-vlm {sym}@{atype}, skipping quote poll'
) )
else: else:
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be
# closed)
while True: while True:
ticker = await stream.receive() ticker = await stream.receive()
# for a real volume contract we rait for the first # for a real volume contract we rait for
# "real" trade to take place # the first "real" trade to take place
if ( if (
# not calc_price # not calc_price
# and not ticker.rtTime # and not ticker.rtTime
not ticker.rtTime not ticker.rtTime
): ):
# spin consuming tickers until we get a real # spin consuming tickers until we
# market datum # get a real market datum
log.debug(f"New unsent ticker: {ticker}") log.debug(f"New unsent ticker: {ticker}")
continue continue
else: else:
log.debug("Received first real volume tick") log.debug("Received first volume tick")
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've
# (ahem, ib_insync is truly stateful trash) # consumed them (ahem, ib_insync is
# truly stateful trash)
ticker.ticks = [] ticker.ticks = []
# XXX: this works because we don't use # XXX: this works because we don't use