From aeb58c03e25b7428e3acbd583e4d4f9dfd2942de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Jul 2020 14:44:32 -0400 Subject: [PATCH] Add startup logic to handle market closure --- piker/brokers/ib.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 5d7dd748..6e2e208c 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -70,6 +70,7 @@ class NonShittyWrapper(Wrapper): def tcpDataArrived(self): """Override time stamps to be floats for now. """ + # use a float to store epoch time instead of datetime self.lastTime = time.time() for ticker in self.pendingTickers: ticker.rtTime = None @@ -88,6 +89,7 @@ class NonShittyIB(ibis.IB): """ def __init__(self): self._createEvents() + # XXX: just to override this wrapper self.wrapper = NonShittyWrapper(self) self.client = ib_Client(self.wrapper) self.errorEvent += self._onError @@ -385,11 +387,16 @@ async def stream_quotes( symbol=symbols[0], ) async with aclosing(stream): - async for ticker in stream: - # convert named tuples to dicts so we send usable keys - # for tick_data in ticker.ticks: - ticker.ticks = [td._asdict() for td in ticker.ticks] + # first quote can be ignored as a 2nd with newer data is sent? + first_ticker = await stream.__anext__() + data = asdict(first_ticker) + log.debug(f"First ticker received {data}") + yield data + quote_cache = {} + def proc_ticker(ticker: Ticker) -> dict: + # convert named tuples to dicts so we send usable keys + ticker.ticks = [td._asdict() for td in ticker.ticks] data = asdict(ticker) # add time stamps for downstream latency measurements @@ -397,7 +404,21 @@ async def stream_quotes( if ticker.rtTime: data['rtTime_s'] = float(ticker.rtTime) / 1000. - yield data + return data + + async for ticker in stream: + # spin consuming tickers until we get a real market datum + if not ticker.rtTime: + log.debug(f"New unsent ticker: {ticker}") + continue + else: + yield proc_ticker(ticker) + log.debug("Received first real volume tick") + # XXX: this works because we don't use ``aclosing()`` above? + break + + async for ticker in stream: + yield proc_ticker(ticker) # ugh, clear ticks since we've consumed them ticker.ticks = []