Properly teardown data feed on cancel

ib_backend
Tyler Goodlet 2020-08-26 21:43:21 -04:00
parent b7c924046a
commit 103014aa58
1 changed files with 25 additions and 6 deletions

View File

@ -21,6 +21,7 @@ from ib_insync.ticker import Ticker
import ib_insync as ibis import ib_insync as ibis
from ib_insync.wrapper import Wrapper from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client from ib_insync.client import Client as ib_Client
import trio
import tractor import tractor
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
@ -141,7 +142,7 @@ class Client:
# durationStr='1 D', # durationStr='1 D',
# time length calcs # time length calcs
durationStr='{count} S'.format(count=3000 * 5), durationStr='{count} S'.format(count=2000 * 5),
barSizeSetting='5 secs', barSizeSetting='5 secs',
# always use extended hours # always use extended hours
@ -256,11 +257,19 @@ class Client:
""" """
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
# ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))
def push(t): def push(t):
log.debug(t) log.debug(t)
to_trio.send_nowait(t) try:
to_trio.send_nowait(t)
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
self.ib.cancelMktData(contract)
ticker.updateEvent.connect(push) ticker.updateEvent.connect(push)
@ -440,16 +449,18 @@ def normalize(
# add time stamps for downstream latency measurements # add time stamps for downstream latency measurements
data['brokerd_ts'] = time.time() data['brokerd_ts'] = time.time()
if ticker.rtTime: if ticker.rtTime:
data['rtTime_s'] = float(ticker.rtTime) / 1000. data['broker_ts'] = data['rtTime_s'] = float(ticker.rtTime) / 1000.
return data return data
@tractor.msg.pub # @tractor.msg.pub
async def stream_quotes( async def stream_quotes(
get_topics: Callable,
symbols: List[str], symbols: List[str],
loglevel: str = None, loglevel: str = None,
# compat for @tractor.msg.pub
topics: Any = None,
get_topics: Callable = None,
) -> AsyncGenerator[str, Dict[str, Any]]: ) -> AsyncGenerator[str, Dict[str, Any]]:
"""Stream symbol quotes. """Stream symbol quotes.
@ -483,6 +494,10 @@ async def stream_quotes(
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
yield {topic: quote} yield {topic: quote}
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
async for ticker in stream: async for ticker in stream:
# spin consuming tickers until we get a real market datum # spin consuming tickers until we get a real market datum
if not ticker.rtTime: if not ticker.rtTime:
@ -494,6 +509,10 @@ async def stream_quotes(
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
yield {topic: quote} yield {topic: quote}
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
ticker.ticks = []
# XXX: this works because we don't use # XXX: this works because we don't use
# ``aclosing()`` above? # ``aclosing()`` above?
break break