Add symbol field to ib quotes

bar_select
Tyler Goodlet 2020-10-29 17:21:41 -04:00
parent da0789e184
commit 307c501763
1 changed files with 14 additions and 3 deletions

View File

@ -33,6 +33,7 @@ from ..data import (
subscribe_ohlc_for_increment, subscribe_ohlc_for_increment,
) )
from ..data._source import from_df from ..data._source import from_df
from ._util import SymbolNotFound
log = get_logger(__name__) log = get_logger(__name__)
@ -88,8 +89,7 @@ class NonShittyWrapper(Wrapper):
class NonShittyIB(ibis.IB): class NonShittyIB(ibis.IB):
"""The beginning of overriding quite a few quetionable decisions """The beginning of overriding quite a few decisions in this lib.
in this lib.
- Don't use datetimes - Don't use datetimes
- Don't use named tuples - Don't use named tuples
@ -117,12 +117,14 @@ class Client:
"""IB wrapped for our broker backend API. """IB wrapped for our broker backend API.
Note: this client requires running inside an ``asyncio`` loop. Note: this client requires running inside an ``asyncio`` loop.
""" """
def __init__( def __init__(
self, self,
ib: ibis.IB, ib: ibis.IB,
) -> None: ) -> None:
self.ib = ib self.ib = ib
self.ib.RaiseRequestErrors = True
async def bars( async def bars(
self, self,
@ -563,6 +565,9 @@ async def stream_quotes(
symbol=sym, symbol=sym,
) )
if bars is None:
raise SymbolNotFound(sym)
# write historical data to buffer # write historical data to buffer
shm.push(bars) shm.push(bars)
shm_token = shm.token shm_token = shm.token
@ -576,7 +581,9 @@ async def stream_quotes(
# first quote can be ignored as a 2nd with newer data is sent? # first quote can be ignored as a 2nd with newer data is sent?
first_ticker = await stream.__anext__() first_ticker = await stream.__anext__()
quote = normalize(first_ticker) quote = normalize(first_ticker)
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []
@ -608,9 +615,11 @@ async def stream_quotes(
calc_price = True calc_price = True
ticker = first_ticker ticker = first_ticker
con = quote['contract']
quote = normalize(ticker, calc_price=calc_price) quote = normalize(ticker, calc_price=calc_price)
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic
first_quote = {topic: quote} first_quote = {topic: quote}
ticker.ticks = [] ticker.ticks = []
@ -623,6 +632,7 @@ async def stream_quotes(
ticker, ticker,
calc_price=calc_price calc_price=calc_price
) )
quote['symbol'] = topic
# TODO: in theory you can send the IPC msg *before* # TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency, # writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support # however, that will require `tractor.msg.pub` support
@ -648,6 +658,7 @@ async def stream_quotes(
con = quote['contract'] con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic
await ctx.send_yield({topic: quote}) await ctx.send_yield({topic: quote})