Properly teardown data feed on cancel
parent
69aced7521
commit
778e3c7b06
|
@ -21,6 +21,7 @@ from ib_insync.ticker import Ticker
|
||||||
import ib_insync as ibis
|
import ib_insync as ibis
|
||||||
from ib_insync.wrapper import Wrapper
|
from ib_insync.wrapper import Wrapper
|
||||||
from ib_insync.client import Client as ib_Client
|
from ib_insync.client import Client as ib_Client
|
||||||
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
|
@ -141,7 +142,7 @@ class Client:
|
||||||
# durationStr='1 D',
|
# durationStr='1 D',
|
||||||
|
|
||||||
# time length calcs
|
# time length calcs
|
||||||
durationStr='{count} S'.format(count=3000 * 5),
|
durationStr='{count} S'.format(count=2000 * 5),
|
||||||
barSizeSetting='5 secs',
|
barSizeSetting='5 secs',
|
||||||
|
|
||||||
# always use extended hours
|
# always use extended hours
|
||||||
|
@ -256,11 +257,19 @@ class Client:
|
||||||
"""
|
"""
|
||||||
contract = await self.find_contract(symbol)
|
contract = await self.find_contract(symbol)
|
||||||
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
|
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
|
||||||
# ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))
|
|
||||||
|
|
||||||
def push(t):
|
def push(t):
|
||||||
log.debug(t)
|
log.debug(t)
|
||||||
to_trio.send_nowait(t)
|
try:
|
||||||
|
to_trio.send_nowait(t)
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
# XXX: eventkit's ``Event.emit()`` for whatever redic
|
||||||
|
# reason will catch and ignore regular exceptions
|
||||||
|
# resulting in tracebacks spammed to console..
|
||||||
|
# Manually do the dereg ourselves.
|
||||||
|
ticker.updateEvent.disconnect(push)
|
||||||
|
log.error(f"Disconnected stream for `{symbol}`")
|
||||||
|
self.ib.cancelMktData(contract)
|
||||||
|
|
||||||
ticker.updateEvent.connect(push)
|
ticker.updateEvent.connect(push)
|
||||||
|
|
||||||
|
@ -440,16 +449,18 @@ def normalize(
|
||||||
# add time stamps for downstream latency measurements
|
# add time stamps for downstream latency measurements
|
||||||
data['brokerd_ts'] = time.time()
|
data['brokerd_ts'] = time.time()
|
||||||
if ticker.rtTime:
|
if ticker.rtTime:
|
||||||
data['rtTime_s'] = float(ticker.rtTime) / 1000.
|
data['broker_ts'] = data['rtTime_s'] = float(ticker.rtTime) / 1000.
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
@tractor.msg.pub
|
# @tractor.msg.pub
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
get_topics: Callable,
|
|
||||||
symbols: List[str],
|
symbols: List[str],
|
||||||
loglevel: str = None,
|
loglevel: str = None,
|
||||||
|
# compat for @tractor.msg.pub
|
||||||
|
topics: Any = None,
|
||||||
|
get_topics: Callable = None,
|
||||||
) -> AsyncGenerator[str, Dict[str, Any]]:
|
) -> AsyncGenerator[str, Dict[str, Any]]:
|
||||||
"""Stream symbol quotes.
|
"""Stream symbol quotes.
|
||||||
|
|
||||||
|
@ -483,6 +494,10 @@ async def stream_quotes(
|
||||||
topic = '.'.join((con['symbol'], con[suffix])).lower()
|
topic = '.'.join((con['symbol'], con[suffix])).lower()
|
||||||
yield {topic: quote}
|
yield {topic: quote}
|
||||||
|
|
||||||
|
# ugh, clear ticks since we've consumed them
|
||||||
|
# (ahem, ib_insync is stateful trash)
|
||||||
|
first_ticker.ticks = []
|
||||||
|
|
||||||
async for ticker in stream:
|
async for ticker in stream:
|
||||||
# spin consuming tickers until we get a real market datum
|
# spin consuming tickers until we get a real market datum
|
||||||
if not ticker.rtTime:
|
if not ticker.rtTime:
|
||||||
|
@ -494,6 +509,10 @@ async def stream_quotes(
|
||||||
topic = '.'.join((con['symbol'], con[suffix])).lower()
|
topic = '.'.join((con['symbol'], con[suffix])).lower()
|
||||||
yield {topic: quote}
|
yield {topic: quote}
|
||||||
|
|
||||||
|
# ugh, clear ticks since we've consumed them
|
||||||
|
# (ahem, ib_insync is stateful trash)
|
||||||
|
ticker.ticks = []
|
||||||
|
|
||||||
# XXX: this works because we don't use
|
# XXX: this works because we don't use
|
||||||
# ``aclosing()`` above?
|
# ``aclosing()`` above?
|
||||||
break
|
break
|
||||||
|
|
Loading…
Reference in New Issue