From e49417a4b82e3fc4f011efd26e536347dedaf927 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Jul 2020 00:03:17 -0400 Subject: [PATCH] Add normalization step for ticks Start a draft normalization format for (sampled) tick data. Ideally we move toward the dense tick format (DFT) enforced by techtonicDB, but for now let's just get a dict of something simple going: `{'type': 'trade', 'price': List[Dict[str, Any]]: """Retreive OHLCV bars for a symbol over a range to the present. """ + bars_kwargs = {'whatToShow': 'TRADES'} + contract = await self.find_contract(symbol) + bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) + # _min = min(2000*100, count) bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime='', # durationStr='60 S', # durationStr='1 D', + + # time length calcs durationStr='{count} S'.format(count=3000 * 5), barSizeSetting='5 secs', - whatToShow='TRADES', - useRTH=False + + # always use extended hours + useRTH=False, + + # restricted per contract type + **bars_kwargs, + # whatToShow='MIDPOINT', + # whatToShow='TRADES', ) - # TODO: raise underlying error here - assert bars + if not bars: + # TODO: raise underlying error here + raise ValueError(f"No bars retreived for {symbol}?") + # convert to pandas dataframe: df = ibis.util.df(bars) return from_df(df) @@ -205,8 +228,9 @@ class Client: con = await self.get_cont_fute(symbol=sym, exchange=exch) elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY - con = ibis.Commodity(symbol=sym) - + con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym] + con = ibis.Commodity(**con_kwargs) + con.bars_kwargs = bars_kwargs else: con = ibis.Stock(symbol=sym, exchange=exch, currency=currency) @@ -222,12 +246,19 @@ class Client: symbol: str, to_trio, opts: Tuple[int] = ('233', '375'), + # opts: Tuple[int] = ('459',), ) -> None: """Stream a ticker using the std L1 api. """ contract = await self.find_contract(symbol) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) - ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) + # ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) + + def push(t): + log.debug(t) + to_trio.send_nowait(t) + + ticker.updateEvent.connect(push) # let the engine run and stream await self.ib.disconnectedEvent @@ -371,15 +402,52 @@ async def get_client( yield get_method_proxy(portal, Client) +def normalize( + ticker: Ticker, + calc_price: bool = False +) -> dict: + # convert named tuples to dicts so we send usable keys + new_ticks = [] + for tick in ticker.ticks: + td = tick._asdict() + + if td['tickType'] in (48, 77): + td['type'] = 'trade' + + new_ticks.append(td) + + ticker.ticks = new_ticks + + # some contracts don't have volume so we may want to + # calculate a midpoint price based on data we can acquire + # (such as bid / ask) + if calc_price: + ticker.ticks.append( + {'type': 'trade', 'price': ticker.marketPrice()} + ) + + # serialize for transport + data = asdict(ticker) + + # add time stamps for downstream latency measurements + data['brokerd_ts'] = time.time() + if ticker.rtTime: + data['rtTime_s'] = float(ticker.rtTime) / 1000. + + return data + + async def stream_quotes( symbols: List[str], + loglevel: str = None, ) -> AsyncGenerator[str, Dict[str, Any]]: """Stream symbol quotes. This is a ``trio`` callable routine meant to be invoked once the brokerd is up. """ - get_console_log('info') + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) stream = await tractor.to_asyncio.run_task( _trio_run_client_method, @@ -389,36 +457,36 @@ async def stream_quotes( async with aclosing(stream): # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() - data = asdict(first_ticker) - log.debug(f"First ticker received {data}") - yield data - quote_cache = {} + # quote_cache = {} - def proc_ticker(ticker: Ticker) -> dict: - # convert named tuples to dicts so we send usable keys - ticker.ticks = [td._asdict() for td in ticker.ticks] - data = asdict(ticker) + if type(first_ticker.contract) not in (ibis.Commodity,): - # add time stamps for downstream latency measurements - data['brokerd_ts'] = time.time() - if ticker.rtTime: - data['rtTime_s'] = float(ticker.rtTime) / 1000. + calc_price = False # should be real volume for contract - return data + data = normalize(first_ticker) + + log.debug(f"First ticker received {data}") + yield data + + async for ticker in stream: + # spin consuming tickers until we get a real market datum + if not ticker.rtTime: + log.debug(f"New unsent ticker: {ticker}") + continue + else: + yield normalize(ticker) + log.debug("Received first real volume tick") + # XXX: this works because we don't use + # ``aclosing()`` above? + break + else: + calc_price = True async for ticker in stream: - # spin consuming tickers until we get a real market datum - if not ticker.rtTime: - log.debug(f"New unsent ticker: {ticker}") - continue - else: - yield proc_ticker(ticker) - log.debug("Received first real volume tick") - # XXX: this works because we don't use ``aclosing()`` above? - break - - async for ticker in stream: - yield proc_ticker(ticker) + yield normalize( + ticker, + calc_price=calc_price + ) # ugh, clear ticks since we've consumed them ticker.ticks = []