Add startup logic to handle market closure

its_happening
Tyler Goodlet 2020-07-28 14:44:32 -04:00
parent b16bc9b42d
commit 80b656e2ab
1 changed files with 26 additions and 5 deletions

View File

@ -70,6 +70,7 @@ class NonShittyWrapper(Wrapper):
def tcpDataArrived(self): def tcpDataArrived(self):
"""Override time stamps to be floats for now. """Override time stamps to be floats for now.
""" """
# use a float to store epoch time instead of datetime
self.lastTime = time.time() self.lastTime = time.time()
for ticker in self.pendingTickers: for ticker in self.pendingTickers:
ticker.rtTime = None ticker.rtTime = None
@ -88,6 +89,7 @@ class NonShittyIB(ibis.IB):
""" """
def __init__(self): def __init__(self):
self._createEvents() self._createEvents()
# XXX: just to override this wrapper
self.wrapper = NonShittyWrapper(self) self.wrapper = NonShittyWrapper(self)
self.client = ib_Client(self.wrapper) self.client = ib_Client(self.wrapper)
self.errorEvent += self._onError self.errorEvent += self._onError
@ -385,11 +387,16 @@ async def stream_quotes(
symbol=symbols[0], symbol=symbols[0],
) )
async with aclosing(stream): async with aclosing(stream):
async for ticker in stream: # first quote can be ignored as a 2nd with newer data is sent?
# convert named tuples to dicts so we send usable keys first_ticker = await stream.__anext__()
# for tick_data in ticker.ticks: data = asdict(first_ticker)
ticker.ticks = [td._asdict() for td in ticker.ticks] log.debug(f"First ticker received {data}")
yield data
quote_cache = {}
def proc_ticker(ticker: Ticker) -> dict:
# convert named tuples to dicts so we send usable keys
ticker.ticks = [td._asdict() for td in ticker.ticks]
data = asdict(ticker) data = asdict(ticker)
# add time stamps for downstream latency measurements # add time stamps for downstream latency measurements
@ -397,7 +404,21 @@ async def stream_quotes(
if ticker.rtTime: if ticker.rtTime:
data['rtTime_s'] = float(ticker.rtTime) / 1000. data['rtTime_s'] = float(ticker.rtTime) / 1000.
yield data return data
async for ticker in stream:
# spin consuming tickers until we get a real market datum
if not ticker.rtTime:
log.debug(f"New unsent ticker: {ticker}")
continue
else:
yield proc_ticker(ticker)
log.debug("Received first real volume tick")
# XXX: this works because we don't use ``aclosing()`` above?
break
async for ticker in stream:
yield proc_ticker(ticker)
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []