diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 614c7f4b..eb321944 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -483,7 +483,9 @@ async def _setup_quote_stream( to_trio.send_nowait(None) - async with load_aio_clients() as accts2clients: + async with load_aio_clients( + disconnect_on_exit=False, + ) as accts2clients: caccount_name, client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) @@ -563,7 +565,8 @@ async def open_aio_quote_stream( from_aio = _quote_streams.get(symbol) if from_aio: - # if we already have a cached feed deliver a rx side clone to consumer + # if we already have a cached feed deliver a rx side clone + # to consumer async with broadcast_receiver( from_aio, 2**6, @@ -754,67 +757,97 @@ async def stream_quotes( await trio.sleep_forever() return # we never expect feed to come up? - async with open_aio_quote_stream( - symbol=sym, - contract=con, - ) as stream: - - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] - - task_status.started((init_msgs, first_quote)) - - async with aclosing(stream): - if syminfo.get('no_vlm', False): - - # generally speaking these feeds don't - # include vlm data. - atype = syminfo['asset_type'] - log.info( - f'Non-vlm asset {sym}@{atype}, skipping quote poll...' - ) - - else: - # wait for real volume on feed (trading might be closed) - while True: - ticker = await stream.receive() - - # for a real volume contract we rait for the first - # "real" trade to take place - if ( - # not calc_price - # and not ticker.rtTime - not ticker.rtTime - ): - # spin consuming tickers until we get a real - # market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] - - # XXX: this works because we don't use - # ``aclosing()`` above? - break - - quote = normalize(ticker) - log.debug(f"First ticker received {quote}") - - # tell caller quotes are now coming in live - feed_is_live.set() - - # last = time.time() - async for ticker in stream: - quote = normalize(ticker) - await send_chan.send({quote['fqsn']: quote}) - + cs: Optional[trio.CancelScope] = None + startup: bool = True + while ( + startup + or cs.cancel_called + ): + with trio.CancelScope() as cs: + async with ( + trio.open_nursery() as nurse, + open_aio_quote_stream( + symbol=sym, + contract=con, + ) as stream, + ): # ugh, clear ticks since we've consumed them - ticker.ticks = [] - # last = time.time() + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + + # only on first entry at feed boot up + if startup: + startup = False + task_status.started((init_msgs, first_quote)) + + # start a stream restarter task which monitors the + # data feed event. + async def reset_on_feed(): + + # TODO: this seems to be surpressed from the + # traceback in ``tractor``? + # assert 0 + + rt_ev = proxy.status_event( + 'Market data farm connection is OK:usfarm' + ) + await rt_ev.wait() + cs.cancel() # cancel called should now be set + + nurse.start_soon(reset_on_feed) + + async with aclosing(stream): + if syminfo.get('no_vlm', False): + + # generally speaking these feeds don't + # include vlm data. + atype = syminfo['asset_type'] + log.info( + f'No-vlm {sym}@{atype}, skipping quote poll' + ) + + else: + # wait for real volume on feed (trading might be + # closed) + while True: + ticker = await stream.receive() + + # for a real volume contract we rait for + # the first "real" trade to take place + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): + # spin consuming tickers until we + # get a real market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first volume tick") + # ugh, clear ticks since we've + # consumed them (ahem, ib_insync is + # truly stateful trash) + ticker.ticks = [] + + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + + # tell caller quotes are now coming in live + feed_is_live.set() + + # last = time.time() + async for ticker in stream: + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + # last = time.time() async def data_reset_hack(