From 307c50176390f730428b59b456ef21e785a11d21 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 29 Oct 2020 17:21:41 -0400 Subject: [PATCH] Add symbol field to ib quotes --- piker/brokers/ib.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 9f40a7d9..c2067665 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -33,6 +33,7 @@ from ..data import ( subscribe_ohlc_for_increment, ) from ..data._source import from_df +from ._util import SymbolNotFound log = get_logger(__name__) @@ -88,8 +89,7 @@ class NonShittyWrapper(Wrapper): class NonShittyIB(ibis.IB): - """The beginning of overriding quite a few quetionable decisions - in this lib. + """The beginning of overriding quite a few decisions in this lib. - Don't use datetimes - Don't use named tuples @@ -117,12 +117,14 @@ class Client: """IB wrapped for our broker backend API. Note: this client requires running inside an ``asyncio`` loop. + """ def __init__( self, ib: ibis.IB, ) -> None: self.ib = ib + self.ib.RaiseRequestErrors = True async def bars( self, @@ -563,6 +565,9 @@ async def stream_quotes( symbol=sym, ) + if bars is None: + raise SymbolNotFound(sym) + # write historical data to buffer shm.push(bars) shm_token = shm.token @@ -576,7 +581,9 @@ async def stream_quotes( # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() + quote = normalize(first_ticker) + # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) first_ticker.ticks = [] @@ -608,9 +615,11 @@ async def stream_quotes( calc_price = True ticker = first_ticker - con = quote['contract'] quote = normalize(ticker, calc_price=calc_price) + con = quote['contract'] topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic + first_quote = {topic: quote} ticker.ticks = [] @@ -623,6 +632,7 @@ async def stream_quotes( ticker, calc_price=calc_price ) + quote['symbol'] = topic # TODO: in theory you can send the IPC msg *before* # writing to the sharedmem array to decrease latency, # however, that will require `tractor.msg.pub` support @@ -648,6 +658,7 @@ async def stream_quotes( con = quote['contract'] topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic await ctx.send_yield({topic: quote})