Add normalization step for ticks
Start a draft normalization format for (sampled) tick data.
Ideally we move toward the dense tick format (DFT) enforced by
techtonicDB, but for now let's just get a dict of something simple
going: `{'type': 'trade', 'price': <price}` kind of thing. This
gets us started being able to real-time chart from all data feed
back-ends. Oh, and hack in support for XAUUSD..and get subactor
logging workin.
			
			
				its_happening
			
			
		
							parent
							
								
									307bc87738
								
							
						
					
					
						commit
						e49417a4b8
					
				| 
						 | 
					@ -97,6 +97,15 @@ class NonShittyIB(ibis.IB):
 | 
				
			||||||
        self._logger = logging.getLogger('ib_insync.ib')
 | 
					        self._logger = logging.getLogger('ib_insync.ib')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# map of symbols to contract ids
 | 
				
			||||||
 | 
					_adhoc_cmdty_data_map = {
 | 
				
			||||||
 | 
					    # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
 | 
				
			||||||
 | 
					    # NOTE: cmdtys don't have trade data:
 | 
				
			||||||
 | 
					    # https://groups.io/g/twsapi/message/44174
 | 
				
			||||||
 | 
					    'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Client:
 | 
					class Client:
 | 
				
			||||||
    """IB wrapped for our broker backend API.
 | 
					    """IB wrapped for our broker backend API.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -119,20 +128,34 @@ class Client:
 | 
				
			||||||
    ) -> List[Dict[str, Any]]:
 | 
					    ) -> List[Dict[str, Any]]:
 | 
				
			||||||
        """Retreive OHLCV bars for a symbol over a range to the present.
 | 
					        """Retreive OHLCV bars for a symbol over a range to the present.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
 | 
					        bars_kwargs = {'whatToShow': 'TRADES'}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        contract = await self.find_contract(symbol)
 | 
					        contract = await self.find_contract(symbol)
 | 
				
			||||||
 | 
					        bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # _min = min(2000*100, count)
 | 
					        # _min = min(2000*100, count)
 | 
				
			||||||
        bars = await self.ib.reqHistoricalDataAsync(
 | 
					        bars = await self.ib.reqHistoricalDataAsync(
 | 
				
			||||||
            contract,
 | 
					            contract,
 | 
				
			||||||
            endDateTime='',
 | 
					            endDateTime='',
 | 
				
			||||||
            # durationStr='60 S',
 | 
					            # durationStr='60 S',
 | 
				
			||||||
            # durationStr='1 D',
 | 
					            # durationStr='1 D',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # time length calcs
 | 
				
			||||||
            durationStr='{count} S'.format(count=3000 * 5),
 | 
					            durationStr='{count} S'.format(count=3000 * 5),
 | 
				
			||||||
            barSizeSetting='5 secs',
 | 
					            barSizeSetting='5 secs',
 | 
				
			||||||
            whatToShow='TRADES',
 | 
					
 | 
				
			||||||
            useRTH=False
 | 
					            # always use extended hours
 | 
				
			||||||
 | 
					            useRTH=False,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # restricted per contract type
 | 
				
			||||||
 | 
					            **bars_kwargs,
 | 
				
			||||||
 | 
					            # whatToShow='MIDPOINT',
 | 
				
			||||||
 | 
					            # whatToShow='TRADES',
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					        if not bars:
 | 
				
			||||||
            # TODO: raise underlying error here
 | 
					            # TODO: raise underlying error here
 | 
				
			||||||
        assert bars
 | 
					            raise ValueError(f"No bars retreived for {symbol}?")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # convert to pandas dataframe:
 | 
					        # convert to pandas dataframe:
 | 
				
			||||||
        df = ibis.util.df(bars)
 | 
					        df = ibis.util.df(bars)
 | 
				
			||||||
        return from_df(df)
 | 
					        return from_df(df)
 | 
				
			||||||
| 
						 | 
					@ -205,8 +228,9 @@ class Client:
 | 
				
			||||||
            con = await self.get_cont_fute(symbol=sym, exchange=exch)
 | 
					            con = await self.get_cont_fute(symbol=sym, exchange=exch)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        elif exch == 'CMDTY':  # eg. XAUSUSD.CMDTY
 | 
					        elif exch == 'CMDTY':  # eg. XAUSUSD.CMDTY
 | 
				
			||||||
            con = ibis.Commodity(symbol=sym)
 | 
					            con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym]
 | 
				
			||||||
 | 
					            con = ibis.Commodity(**con_kwargs)
 | 
				
			||||||
 | 
					            con.bars_kwargs = bars_kwargs
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            con = ibis.Stock(symbol=sym, exchange=exch, currency=currency)
 | 
					            con = ibis.Stock(symbol=sym, exchange=exch, currency=currency)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -222,12 +246,19 @@ class Client:
 | 
				
			||||||
        symbol: str,
 | 
					        symbol: str,
 | 
				
			||||||
        to_trio,
 | 
					        to_trio,
 | 
				
			||||||
        opts: Tuple[int] = ('233', '375'),
 | 
					        opts: Tuple[int] = ('233', '375'),
 | 
				
			||||||
 | 
					        # opts: Tuple[int] = ('459',),
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
        """Stream a ticker using the std L1 api.
 | 
					        """Stream a ticker using the std L1 api.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        contract = await self.find_contract(symbol)
 | 
					        contract = await self.find_contract(symbol)
 | 
				
			||||||
        ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
 | 
					        ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
 | 
				
			||||||
        ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))
 | 
					        # ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        def push(t):
 | 
				
			||||||
 | 
					            log.debug(t)
 | 
				
			||||||
 | 
					            to_trio.send_nowait(t)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        ticker.updateEvent.connect(push)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # let the engine run and stream
 | 
					        # let the engine run and stream
 | 
				
			||||||
        await self.ib.disconnectedEvent
 | 
					        await self.ib.disconnectedEvent
 | 
				
			||||||
| 
						 | 
					@ -371,32 +402,31 @@ async def get_client(
 | 
				
			||||||
        yield get_method_proxy(portal, Client)
 | 
					        yield get_method_proxy(portal, Client)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def stream_quotes(
 | 
					def normalize(
 | 
				
			||||||
    symbols: List[str],
 | 
					    ticker: Ticker,
 | 
				
			||||||
) -> AsyncGenerator[str, Dict[str, Any]]:
 | 
					    calc_price: bool = False
 | 
				
			||||||
    """Stream symbol quotes.
 | 
					) -> dict:
 | 
				
			||||||
 | 
					 | 
				
			||||||
    This is a ``trio`` callable routine meant to be invoked
 | 
					 | 
				
			||||||
    once the brokerd is up.
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
    get_console_log('info')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    stream = await tractor.to_asyncio.run_task(
 | 
					 | 
				
			||||||
        _trio_run_client_method,
 | 
					 | 
				
			||||||
        method='stream_ticker',
 | 
					 | 
				
			||||||
        symbol=symbols[0],
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
    async with aclosing(stream):
 | 
					 | 
				
			||||||
        # first quote can be ignored as a 2nd with newer data is sent?
 | 
					 | 
				
			||||||
        first_ticker = await stream.__anext__()
 | 
					 | 
				
			||||||
        data = asdict(first_ticker)
 | 
					 | 
				
			||||||
        log.debug(f"First ticker received {data}")
 | 
					 | 
				
			||||||
        yield data
 | 
					 | 
				
			||||||
        quote_cache = {}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        def proc_ticker(ticker: Ticker) -> dict:
 | 
					 | 
				
			||||||
    # convert named tuples to dicts so we send usable keys
 | 
					    # convert named tuples to dicts so we send usable keys
 | 
				
			||||||
            ticker.ticks = [td._asdict() for td in ticker.ticks]
 | 
					    new_ticks = []
 | 
				
			||||||
 | 
					    for tick in ticker.ticks:
 | 
				
			||||||
 | 
					        td = tick._asdict()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if td['tickType'] in (48, 77):
 | 
				
			||||||
 | 
					            td['type'] = 'trade'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        new_ticks.append(td)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ticker.ticks = new_ticks
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # some contracts don't have volume so we may want to 
 | 
				
			||||||
 | 
					    # calculate a midpoint price based on data we can acquire
 | 
				
			||||||
 | 
					    # (such as bid / ask)
 | 
				
			||||||
 | 
					    if calc_price:
 | 
				
			||||||
 | 
					        ticker.ticks.append(
 | 
				
			||||||
 | 
					            {'type': 'trade', 'price': ticker.marketPrice()}
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # serialize for transport
 | 
				
			||||||
    data = asdict(ticker)
 | 
					    data = asdict(ticker)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # add time stamps for downstream latency measurements
 | 
					    # add time stamps for downstream latency measurements
 | 
				
			||||||
| 
						 | 
					@ -406,19 +436,57 @@ async def stream_quotes(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return data
 | 
					    return data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async def stream_quotes(
 | 
				
			||||||
 | 
					    symbols: List[str],
 | 
				
			||||||
 | 
					    loglevel: str = None,
 | 
				
			||||||
 | 
					) -> AsyncGenerator[str, Dict[str, Any]]:
 | 
				
			||||||
 | 
					    """Stream symbol quotes.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    This is a ``trio`` callable routine meant to be invoked
 | 
				
			||||||
 | 
					    once the brokerd is up.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
				
			||||||
 | 
					    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    stream = await tractor.to_asyncio.run_task(
 | 
				
			||||||
 | 
					        _trio_run_client_method,
 | 
				
			||||||
 | 
					        method='stream_ticker',
 | 
				
			||||||
 | 
					        symbol=symbols[0],
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    async with aclosing(stream):
 | 
				
			||||||
 | 
					        # first quote can be ignored as a 2nd with newer data is sent?
 | 
				
			||||||
 | 
					        first_ticker = await stream.__anext__()
 | 
				
			||||||
 | 
					        # quote_cache = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if type(first_ticker.contract) not in (ibis.Commodity,):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            calc_price = False  # should be real volume for contract
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            data = normalize(first_ticker)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            log.debug(f"First ticker received {data}")
 | 
				
			||||||
 | 
					            yield data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            async for ticker in stream:
 | 
					            async for ticker in stream:
 | 
				
			||||||
                # spin consuming tickers until we get a real market datum
 | 
					                # spin consuming tickers until we get a real market datum
 | 
				
			||||||
                if not ticker.rtTime:
 | 
					                if not ticker.rtTime:
 | 
				
			||||||
                    log.debug(f"New unsent ticker: {ticker}")
 | 
					                    log.debug(f"New unsent ticker: {ticker}")
 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                yield proc_ticker(ticker)
 | 
					                    yield normalize(ticker)
 | 
				
			||||||
                    log.debug("Received first real volume tick")
 | 
					                    log.debug("Received first real volume tick")
 | 
				
			||||||
                # XXX: this works because we don't use ``aclosing()`` above?
 | 
					                    # XXX: this works because we don't use
 | 
				
			||||||
 | 
					                    # ``aclosing()`` above?
 | 
				
			||||||
                    break
 | 
					                    break
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            calc_price = True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async for ticker in stream:
 | 
					        async for ticker in stream:
 | 
				
			||||||
            yield proc_ticker(ticker)
 | 
					            yield normalize(
 | 
				
			||||||
 | 
					                ticker,
 | 
				
			||||||
 | 
					                calc_price=calc_price
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # ugh, clear ticks since we've consumed them
 | 
					            # ugh, clear ticks since we've consumed them
 | 
				
			||||||
            ticker.ticks = []
 | 
					            ticker.ticks = []
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue