diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index a5a4b254..50119c38 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -21,6 +21,7 @@ from ib_insync.ticker import Ticker import ib_insync as ibis from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client +import trio import tractor from ..log import get_logger, get_console_log @@ -141,7 +142,7 @@ class Client: # durationStr='1 D', # time length calcs - durationStr='{count} S'.format(count=3000 * 5), + durationStr='{count} S'.format(count=2000 * 5), barSizeSetting='5 secs', # always use extended hours @@ -256,11 +257,19 @@ class Client: """ contract = await self.find_contract(symbol) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) - # ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) def push(t): log.debug(t) - to_trio.send_nowait(t) + try: + to_trio.send_nowait(t) + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + self.ib.cancelMktData(contract) ticker.updateEvent.connect(push) @@ -440,16 +449,18 @@ def normalize( # add time stamps for downstream latency measurements data['brokerd_ts'] = time.time() if ticker.rtTime: - data['rtTime_s'] = float(ticker.rtTime) / 1000. + data['broker_ts'] = data['rtTime_s'] = float(ticker.rtTime) / 1000. return data -@tractor.msg.pub +# @tractor.msg.pub async def stream_quotes( - get_topics: Callable, symbols: List[str], loglevel: str = None, + # compat for @tractor.msg.pub + topics: Any = None, + get_topics: Callable = None, ) -> AsyncGenerator[str, Dict[str, Any]]: """Stream symbol quotes. @@ -483,6 +494,10 @@ async def stream_quotes( topic = '.'.join((con['symbol'], con[suffix])).lower() yield {topic: quote} + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + async for ticker in stream: # spin consuming tickers until we get a real market datum if not ticker.rtTime: @@ -494,6 +509,10 @@ async def stream_quotes( topic = '.'.join((con['symbol'], con[suffix])).lower() yield {topic: quote} + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + ticker.ticks = [] + # XXX: this works because we don't use # ``aclosing()`` above? break