Add normalization step for ticks

Start a draft normalization format for (sampled) tick data.
Ideally we move toward the dense tick format (DFT) enforced by
techtonicDB, but for now let's just get a dict of something simple
going: `{'type': 'trade', 'price': <price}` kind of thing. This
gets us started being able to real-time chart from all data feed
back-ends. Oh, and hack in support for XAUUSD..and get subactor
logging workin.
ib_backend
Tyler Goodlet 2020-07-31 00:03:17 -04:00
parent aeb58c03e2
commit 482dc510fa
1 changed files with 101 additions and 33 deletions

View File

@ -97,6 +97,15 @@ class NonShittyIB(ibis.IB):
self._logger = logging.getLogger('ib_insync.ib') self._logger = logging.getLogger('ib_insync.ib')
# map of symbols to contract ids
_adhoc_cmdty_data_map = {
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
# NOTE: cmdtys don't have trade data:
# https://groups.io/g/twsapi/message/44174
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
class Client: class Client:
"""IB wrapped for our broker backend API. """IB wrapped for our broker backend API.
@ -119,20 +128,34 @@ class Client:
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present. """Retreive OHLCV bars for a symbol over a range to the present.
""" """
bars_kwargs = {'whatToShow': 'TRADES'}
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count) # _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync( bars = await self.ib.reqHistoricalDataAsync(
contract, contract,
endDateTime='', endDateTime='',
# durationStr='60 S', # durationStr='60 S',
# durationStr='1 D', # durationStr='1 D',
# time length calcs
durationStr='{count} S'.format(count=3000 * 5), durationStr='{count} S'.format(count=3000 * 5),
barSizeSetting='5 secs', barSizeSetting='5 secs',
whatToShow='TRADES',
useRTH=False # always use extended hours
useRTH=False,
# restricted per contract type
**bars_kwargs,
# whatToShow='MIDPOINT',
# whatToShow='TRADES',
) )
# TODO: raise underlying error here if not bars:
assert bars # TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?")
# convert to pandas dataframe: # convert to pandas dataframe:
df = ibis.util.df(bars) df = ibis.util.df(bars)
return from_df(df) return from_df(df)
@ -205,8 +228,9 @@ class Client:
con = await self.get_cont_fute(symbol=sym, exchange=exch) con = await self.get_cont_fute(symbol=sym, exchange=exch)
elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY
con = ibis.Commodity(symbol=sym) con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym]
con = ibis.Commodity(**con_kwargs)
con.bars_kwargs = bars_kwargs
else: else:
con = ibis.Stock(symbol=sym, exchange=exch, currency=currency) con = ibis.Stock(symbol=sym, exchange=exch, currency=currency)
@ -222,12 +246,19 @@ class Client:
symbol: str, symbol: str,
to_trio, to_trio,
opts: Tuple[int] = ('233', '375'), opts: Tuple[int] = ('233', '375'),
# opts: Tuple[int] = ('459',),
) -> None: ) -> None:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
""" """
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) # ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))
def push(t):
log.debug(t)
to_trio.send_nowait(t)
ticker.updateEvent.connect(push)
# let the engine run and stream # let the engine run and stream
await self.ib.disconnectedEvent await self.ib.disconnectedEvent
@ -371,15 +402,52 @@ async def get_client(
yield get_method_proxy(portal, Client) yield get_method_proxy(portal, Client)
def normalize(
ticker: Ticker,
calc_price: bool = False
) -> dict:
# convert named tuples to dicts so we send usable keys
new_ticks = []
for tick in ticker.ticks:
td = tick._asdict()
if td['tickType'] in (48, 77):
td['type'] = 'trade'
new_ticks.append(td)
ticker.ticks = new_ticks
# some contracts don't have volume so we may want to
# calculate a midpoint price based on data we can acquire
# (such as bid / ask)
if calc_price:
ticker.ticks.append(
{'type': 'trade', 'price': ticker.marketPrice()}
)
# serialize for transport
data = asdict(ticker)
# add time stamps for downstream latency measurements
data['brokerd_ts'] = time.time()
if ticker.rtTime:
data['rtTime_s'] = float(ticker.rtTime) / 1000.
return data
async def stream_quotes( async def stream_quotes(
symbols: List[str], symbols: List[str],
loglevel: str = None,
) -> AsyncGenerator[str, Dict[str, Any]]: ) -> AsyncGenerator[str, Dict[str, Any]]:
"""Stream symbol quotes. """Stream symbol quotes.
This is a ``trio`` callable routine meant to be invoked This is a ``trio`` callable routine meant to be invoked
once the brokerd is up. once the brokerd is up.
""" """
get_console_log('info') # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
stream = await tractor.to_asyncio.run_task( stream = await tractor.to_asyncio.run_task(
_trio_run_client_method, _trio_run_client_method,
@ -389,36 +457,36 @@ async def stream_quotes(
async with aclosing(stream): async with aclosing(stream):
# first quote can be ignored as a 2nd with newer data is sent? # first quote can be ignored as a 2nd with newer data is sent?
first_ticker = await stream.__anext__() first_ticker = await stream.__anext__()
data = asdict(first_ticker) # quote_cache = {}
log.debug(f"First ticker received {data}")
yield data
quote_cache = {}
def proc_ticker(ticker: Ticker) -> dict: if type(first_ticker.contract) not in (ibis.Commodity,):
# convert named tuples to dicts so we send usable keys
ticker.ticks = [td._asdict() for td in ticker.ticks]
data = asdict(ticker)
# add time stamps for downstream latency measurements calc_price = False # should be real volume for contract
data['brokerd_ts'] = time.time()
if ticker.rtTime:
data['rtTime_s'] = float(ticker.rtTime) / 1000.
return data data = normalize(first_ticker)
log.debug(f"First ticker received {data}")
yield data
async for ticker in stream:
# spin consuming tickers until we get a real market datum
if not ticker.rtTime:
log.debug(f"New unsent ticker: {ticker}")
continue
else:
yield normalize(ticker)
log.debug("Received first real volume tick")
# XXX: this works because we don't use
# ``aclosing()`` above?
break
else:
calc_price = True
async for ticker in stream: async for ticker in stream:
# spin consuming tickers until we get a real market datum yield normalize(
if not ticker.rtTime: ticker,
log.debug(f"New unsent ticker: {ticker}") calc_price=calc_price
continue )
else:
yield proc_ticker(ticker)
log.debug("Received first real volume tick")
# XXX: this works because we don't use ``aclosing()`` above?
break
async for ticker in stream:
yield proc_ticker(ticker)
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []