From 2bcebe779a4726baff0ae36fd61139f11003c0cf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Dec 2020 15:46:46 -0500 Subject: [PATCH] Add historical backfilling to ib backend --- piker/brokers/ib.py | 186 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 160 insertions(+), 26 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index cf509bfb..9731e37f 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ -from contextlib import asynccontextmanager, contextmanager +from contextlib import asynccontextmanager from dataclasses import asdict from functools import partial +from datetime import datetime from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable import asyncio import logging @@ -32,6 +33,7 @@ import itertools import time from async_generator import aclosing +from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails from ib_insync.ticker import Ticker import ib_insync as ibis @@ -45,7 +47,7 @@ from ..data import ( maybe_spawn_brokerd, iterticks, attach_shm_array, - get_shm_token, + # get_shm_token, subscribe_ohlc_for_increment, ) from ..data._source import from_df @@ -86,6 +88,8 @@ _time_frames = { 'Y': 'OneYear', } +_show_wap_in_history = False + # overrides to sidestep pretty questionable design decisions in # ``ib_insync``: @@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = { 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), } +_enters = 0 + class Client: """IB wrapped for our broker backend API. @@ -142,32 +148,54 @@ class Client: self.ib = ib self.ib.RaiseRequestErrors = True + # NOTE: the ib.client here is "throttled" to 45 rps by default + async def bars( self, symbol: str, # EST in ISO 8601 format is required... below is EPOCH - start_date: str = "1970-01-01T00:00:00.000000-05:00", - time_frame: str = '1m', - count: int = int(2e3), # <- max allowed per query - is_paid_feed: bool = False, + start_dt: str = "1970-01-01T00:00:00.000000-05:00", + end_dt: str = "", + + sample_period_s: str = 1, # ohlc sample period + period_count: int = int(2e3), # <- max per 1s sample query + + is_paid_feed: bool = False, # placeholder ) -> List[Dict[str, Any]]: """Retreive OHLCV bars for a symbol over a range to the present. """ bars_kwargs = {'whatToShow': 'TRADES'} + global _enters + print(f'ENTER BARS {_enters}') + _enters += 1 + contract = await self.find_contract(symbol) bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) # _min = min(2000*100, count) bars = await self.ib.reqHistoricalDataAsync( contract, - endDateTime='', - # durationStr='60 S', - # durationStr='1 D', + endDateTime=end_dt, + + # time history length values format: + # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` + + # OHLC sampling values: + # 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins, + # 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins, + # 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M + # barSizeSetting='1 secs', + + # durationStr='{count} S'.format(count=15000 * 5), + # durationStr='{count} D'.format(count=1), + # barSizeSetting='5 secs', + + durationStr='{count} S'.format(count=period_count), + barSizeSetting='1 secs', + + # barSizeSetting='1 min', - # time length calcs - durationStr='{count} S'.format(count=5000 * 5), - barSizeSetting='5 secs', # always use extended hours useRTH=False, @@ -181,9 +209,13 @@ class Client: # TODO: raise underlying error here raise ValueError(f"No bars retreived for {symbol}?") + # TODO: rewrite this faster with ``numba`` # convert to pandas dataframe: df = ibis.util.df(bars) - return from_df(df) + return bars, from_df(df) + + def onError(self, reqId, errorCode, errorString, contract) -> None: + breakpoint() async def search_stocks( self, @@ -237,6 +269,8 @@ class Client: """Get an unqualifed contract for the current "continous" future. """ contcon = ibis.ContFuture(symbol, exchange=exchange) + + # it's the "front" contract returned here frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] return ibis.Future(conId=frontcon.conId) @@ -279,10 +313,10 @@ class Client: if exch in ('PURE', 'TSE'): # non-yankee currency = 'CAD' - if exch in ('PURE',): + if exch in ('PURE', 'TSE'): # stupid ib... + primaryExchange = exch exch = 'SMART' - primaryExchange = 'PURE' con = ibis.Stock( symbol=sym, @@ -293,10 +327,27 @@ class Client: try: exch = 'SMART' if not exch else exch contract = (await self.ib.qualifyContractsAsync(con))[0] + + head = await self.get_head_time(contract) + print(head) except IndexError: raise ValueError(f"No contract could be found {con}") return contract + async def get_head_time( + self, + contract: Contract, + ) -> datetime: + """Return the first datetime stamp for ``contract``. + + """ + return await self.ib.reqHeadTimeStampAsync( + contract, + whatToShow='TRADES', + useRTH=False, + formatDate=2, # timezone aware UTC datetime + ) + async def stream_ticker( self, symbol: str, @@ -309,7 +360,13 @@ class Client: contract = await self.find_contract(symbol) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) + # define a simple queue push routine that streams quote packets + # to trio over the ``to_trio`` memory channel. + def push(t): + """Push quotes to trio task. + + """ # log.debug(t) try: to_trio.send_nowait(t) @@ -346,9 +403,17 @@ async def _aio_get_client( """ # first check cache for existing client + # breakpoint() try: - yield _client_cache[(host, port)] - except KeyError: + if port: + client = _client_cache[(host, port)] + else: + # grab first cached client + client = list(_client_cache.values())[0] + + yield client + + except (KeyError, IndexError): # TODO: in case the arbiter has no record # of existing brokerd we need to broadcast for one. @@ -359,9 +424,11 @@ async def _aio_get_client( ib = NonShittyIB() ports = _try_ports if port is None else [port] + _err = None for port in ports: try: + log.info(f"Connecting to the EYEBEE on port {port}!") await ib.connectAsync(host, port, clientId=client_id) break except ConnectionRefusedError as ce: @@ -373,6 +440,7 @@ async def _aio_get_client( try: client = Client(ib) _client_cache[(host, port)] = client + log.debug(f"Caching client for {(host, port)}") yield client except BaseException: ib.disconnect() @@ -385,7 +453,6 @@ async def _aio_run_client_method( from_trio=None, **kwargs, ) -> None: - log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!") async with _aio_get_client() as client: async_meth = getattr(client, meth) @@ -402,6 +469,9 @@ async def _trio_run_client_method( method: str, **kwargs, ) -> None: + """Asyncio entry point to run tasks against the ``ib_insync`` api. + + """ ca = tractor.current_actor() assert ca.is_infected_aio() @@ -530,18 +600,60 @@ def normalize( _local_buffer_writers = {} -@contextmanager -def activate_writer(key: str): +@asynccontextmanager +async def activate_writer(key: str) -> (bool, trio.Nursery): try: writer_already_exists = _local_buffer_writers.get(key, False) + if not writer_already_exists: _local_buffer_writers[key] = True - yield writer_already_exists + async with trio.open_nursery() as n: + yield writer_already_exists, n + else: + yield writer_already_exists, None finally: _local_buffer_writers.pop(key, None) +async def fill_bars( + first_bars, + shm, + count: int = 21, +) -> None: + """Fill historical bars into shared mem / storage afap. + + TODO: avoid pacing constraints: + https://github.com/pikers/piker/issues/128 + + """ + next_dt = first_bars[0].date + + i = 0 + while i < count: + + try: + bars, bars_array = await _trio_run_client_method( + method='bars', + symbol='.'.join( + (first_bars.contract.symbol, first_bars.contract.exchange) + ), + end_dt=next_dt, + + ) + shm.push(bars_array, prepend=True) + i += 1 + next_dt = bars[0].date + + except RequestError as err: + # TODO: retreive underlying ``ib_insync`` error~~ + if err.code == 162: + log.exception( + "Data query rate reached: Press `ctrl-alt-f` in TWS") + + await tractor.breakpoint() + + # TODO: figure out how to share quote feeds sanely despite # the wacky ``ib_insync`` api. # @tractor.msg.pub @@ -575,7 +687,9 @@ async def stream_quotes( # check if a writer already is alive in a streaming task, # otherwise start one and mark it as now existing - with activate_writer(shm_token['shm_name']) as writer_already_exists: + async with activate_writer( + shm_token['shm_name'] + ) as (writer_already_exists, ln): # maybe load historical ohlcv in to shared mem # check if shm has already been created by previous @@ -588,18 +702,29 @@ async def stream_quotes( # we are the buffer writer readonly=False, ) - bars = await _trio_run_client_method( + + # async def retrieve_and_push(): + start = time.time() + + bars, bars_array = await _trio_run_client_method( method='bars', symbol=sym, + ) - if bars is None: + log.info(f"bars_array request: {time.time() - start}") + + if bars_array is None: raise SymbolNotFound(sym) # write historical data to buffer - shm.push(bars) + shm.push(bars_array) shm_token = shm.token + # TODO: generalize this for other brokers + # start bar filler task in bg + ln.start_soon(fill_bars, bars, shm) + times = shm.array['time'] delay_s = times[-1] - times[times != times[-1]][-1] subscribe_ohlc_for_increment(shm, delay_s) @@ -656,6 +781,7 @@ async def stream_quotes( # real-time stream async for ticker in stream: + # print(ticker.vwap) quote = normalize( ticker, calc_price=calc_price @@ -674,6 +800,8 @@ async def stream_quotes( for tick in iterticks(quote, types=('trade', 'utrade',)): last = tick['price'] + # print(f"{quote['symbol']}: {tick}") + # update last entry # benchmarked in the 4-5 us range o, high, low, v = shm.array[-1][ @@ -687,7 +815,13 @@ async def stream_quotes( # is also the close/last trade price o = last - shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = ( + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'volume', + ]][-1] = ( o, max(high, last), min(low, last),