Add historical backfilling to ib backend
							parent
							
								
									4bb02ded2e
								
							
						
					
					
						commit
						2bcebe779a
					
				| 
						 | 
				
			
			@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
 | 
			
		|||
built on it) and thus actor aware API calls must be spawned with
 | 
			
		||||
``infected_aio==True``.
 | 
			
		||||
"""
 | 
			
		||||
from contextlib import asynccontextmanager, contextmanager
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from dataclasses import asdict
 | 
			
		||||
from functools import partial
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
 | 
			
		||||
import asyncio
 | 
			
		||||
import logging
 | 
			
		||||
| 
						 | 
				
			
			@ -32,6 +33,7 @@ import itertools
 | 
			
		|||
import time
 | 
			
		||||
 | 
			
		||||
from async_generator import aclosing
 | 
			
		||||
from ib_insync.wrapper import RequestError
 | 
			
		||||
from ib_insync.contract import Contract, ContractDetails
 | 
			
		||||
from ib_insync.ticker import Ticker
 | 
			
		||||
import ib_insync as ibis
 | 
			
		||||
| 
						 | 
				
			
			@ -45,7 +47,7 @@ from ..data import (
 | 
			
		|||
    maybe_spawn_brokerd,
 | 
			
		||||
    iterticks,
 | 
			
		||||
    attach_shm_array,
 | 
			
		||||
    get_shm_token,
 | 
			
		||||
    # get_shm_token,
 | 
			
		||||
    subscribe_ohlc_for_increment,
 | 
			
		||||
)
 | 
			
		||||
from ..data._source import from_df
 | 
			
		||||
| 
						 | 
				
			
			@ -86,6 +88,8 @@ _time_frames = {
 | 
			
		|||
    'Y': 'OneYear',
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
_show_wap_in_history = False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# overrides to sidestep pretty questionable design decisions in
 | 
			
		||||
# ``ib_insync``:
 | 
			
		||||
| 
						 | 
				
			
			@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = {
 | 
			
		|||
    'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
_enters = 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Client:
 | 
			
		||||
    """IB wrapped for our broker backend API.
 | 
			
		||||
| 
						 | 
				
			
			@ -142,32 +148,54 @@ class Client:
 | 
			
		|||
        self.ib = ib
 | 
			
		||||
        self.ib.RaiseRequestErrors = True
 | 
			
		||||
 | 
			
		||||
        # NOTE: the ib.client here is "throttled" to 45 rps by default
 | 
			
		||||
 | 
			
		||||
    async def bars(
 | 
			
		||||
        self,
 | 
			
		||||
        symbol: str,
 | 
			
		||||
        # EST in ISO 8601 format is required... below is EPOCH
 | 
			
		||||
        start_date: str = "1970-01-01T00:00:00.000000-05:00",
 | 
			
		||||
        time_frame: str = '1m',
 | 
			
		||||
        count: int = int(2e3),  # <- max allowed per query
 | 
			
		||||
        is_paid_feed: bool = False,
 | 
			
		||||
        start_dt: str = "1970-01-01T00:00:00.000000-05:00",
 | 
			
		||||
        end_dt: str = "",
 | 
			
		||||
 | 
			
		||||
        sample_period_s: str = 1,  # ohlc sample period
 | 
			
		||||
        period_count: int = int(2e3),  # <- max per 1s sample query
 | 
			
		||||
 | 
			
		||||
        is_paid_feed: bool = False,  # placeholder
 | 
			
		||||
    ) -> List[Dict[str, Any]]:
 | 
			
		||||
        """Retreive OHLCV bars for a symbol over a range to the present.
 | 
			
		||||
        """
 | 
			
		||||
        bars_kwargs = {'whatToShow': 'TRADES'}
 | 
			
		||||
 | 
			
		||||
        global _enters
 | 
			
		||||
        print(f'ENTER BARS {_enters}')
 | 
			
		||||
        _enters += 1
 | 
			
		||||
 | 
			
		||||
        contract = await self.find_contract(symbol)
 | 
			
		||||
        bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
 | 
			
		||||
 | 
			
		||||
        # _min = min(2000*100, count)
 | 
			
		||||
        bars = await self.ib.reqHistoricalDataAsync(
 | 
			
		||||
            contract,
 | 
			
		||||
            endDateTime='',
 | 
			
		||||
            # durationStr='60 S',
 | 
			
		||||
            # durationStr='1 D',
 | 
			
		||||
            endDateTime=end_dt,
 | 
			
		||||
 | 
			
		||||
            # time history length values format:
 | 
			
		||||
            # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
 | 
			
		||||
 | 
			
		||||
            # OHLC sampling values:
 | 
			
		||||
            # 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
 | 
			
		||||
            # 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
 | 
			
		||||
            # 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
 | 
			
		||||
            # barSizeSetting='1 secs',
 | 
			
		||||
 | 
			
		||||
            # durationStr='{count} S'.format(count=15000 * 5),
 | 
			
		||||
            # durationStr='{count} D'.format(count=1),
 | 
			
		||||
            # barSizeSetting='5 secs',
 | 
			
		||||
 | 
			
		||||
            durationStr='{count} S'.format(count=period_count),
 | 
			
		||||
            barSizeSetting='1 secs',
 | 
			
		||||
 | 
			
		||||
            # barSizeSetting='1 min',
 | 
			
		||||
 | 
			
		||||
            # time length calcs
 | 
			
		||||
            durationStr='{count} S'.format(count=5000 * 5),
 | 
			
		||||
            barSizeSetting='5 secs',
 | 
			
		||||
 | 
			
		||||
            # always use extended hours
 | 
			
		||||
            useRTH=False,
 | 
			
		||||
| 
						 | 
				
			
			@ -181,9 +209,13 @@ class Client:
 | 
			
		|||
            # TODO: raise underlying error here
 | 
			
		||||
            raise ValueError(f"No bars retreived for {symbol}?")
 | 
			
		||||
 | 
			
		||||
        # TODO: rewrite this faster with ``numba``
 | 
			
		||||
        # convert to pandas dataframe:
 | 
			
		||||
        df = ibis.util.df(bars)
 | 
			
		||||
        return from_df(df)
 | 
			
		||||
        return bars, from_df(df)
 | 
			
		||||
 | 
			
		||||
    def onError(self, reqId, errorCode, errorString, contract) -> None:
 | 
			
		||||
        breakpoint()
 | 
			
		||||
 | 
			
		||||
    async def search_stocks(
 | 
			
		||||
        self,
 | 
			
		||||
| 
						 | 
				
			
			@ -237,6 +269,8 @@ class Client:
 | 
			
		|||
        """Get an unqualifed contract for the current "continous" future.
 | 
			
		||||
        """
 | 
			
		||||
        contcon = ibis.ContFuture(symbol, exchange=exchange)
 | 
			
		||||
 | 
			
		||||
        # it's the "front" contract returned here
 | 
			
		||||
        frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
 | 
			
		||||
        return ibis.Future(conId=frontcon.conId)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -279,10 +313,10 @@ class Client:
 | 
			
		|||
 | 
			
		||||
            if exch in ('PURE', 'TSE'):  # non-yankee
 | 
			
		||||
                currency = 'CAD'
 | 
			
		||||
                if exch in ('PURE',):
 | 
			
		||||
                if exch in ('PURE', 'TSE'):
 | 
			
		||||
                    # stupid ib...
 | 
			
		||||
                    primaryExchange = exch
 | 
			
		||||
                    exch = 'SMART'
 | 
			
		||||
                    primaryExchange = 'PURE'
 | 
			
		||||
 | 
			
		||||
            con = ibis.Stock(
 | 
			
		||||
                symbol=sym,
 | 
			
		||||
| 
						 | 
				
			
			@ -293,10 +327,27 @@ class Client:
 | 
			
		|||
        try:
 | 
			
		||||
            exch = 'SMART' if not exch else exch
 | 
			
		||||
            contract = (await self.ib.qualifyContractsAsync(con))[0]
 | 
			
		||||
 | 
			
		||||
            head = await self.get_head_time(contract)
 | 
			
		||||
            print(head)
 | 
			
		||||
        except IndexError:
 | 
			
		||||
            raise ValueError(f"No contract could be found {con}")
 | 
			
		||||
        return contract
 | 
			
		||||
 | 
			
		||||
    async def get_head_time(
 | 
			
		||||
        self,
 | 
			
		||||
        contract: Contract,
 | 
			
		||||
    ) -> datetime:
 | 
			
		||||
        """Return the first datetime stamp for ``contract``.
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        return await self.ib.reqHeadTimeStampAsync(
 | 
			
		||||
            contract,
 | 
			
		||||
            whatToShow='TRADES',
 | 
			
		||||
            useRTH=False,
 | 
			
		||||
            formatDate=2,  # timezone aware UTC datetime
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    async def stream_ticker(
 | 
			
		||||
        self,
 | 
			
		||||
        symbol: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -309,7 +360,13 @@ class Client:
 | 
			
		|||
        contract = await self.find_contract(symbol)
 | 
			
		||||
        ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
 | 
			
		||||
 | 
			
		||||
        # define a simple queue push routine that streams quote packets
 | 
			
		||||
        # to trio over the ``to_trio`` memory channel.
 | 
			
		||||
 | 
			
		||||
        def push(t):
 | 
			
		||||
            """Push quotes to trio task.
 | 
			
		||||
 | 
			
		||||
            """
 | 
			
		||||
            # log.debug(t)
 | 
			
		||||
            try:
 | 
			
		||||
                to_trio.send_nowait(t)
 | 
			
		||||
| 
						 | 
				
			
			@ -346,9 +403,17 @@ async def _aio_get_client(
 | 
			
		|||
    """
 | 
			
		||||
    # first check cache for existing client
 | 
			
		||||
 | 
			
		||||
    # breakpoint()
 | 
			
		||||
    try:
 | 
			
		||||
        yield _client_cache[(host, port)]
 | 
			
		||||
    except KeyError:
 | 
			
		||||
        if port:
 | 
			
		||||
            client = _client_cache[(host, port)]
 | 
			
		||||
        else:
 | 
			
		||||
            # grab first cached client
 | 
			
		||||
            client = list(_client_cache.values())[0]
 | 
			
		||||
 | 
			
		||||
        yield client
 | 
			
		||||
 | 
			
		||||
    except (KeyError, IndexError):
 | 
			
		||||
        # TODO: in case the arbiter has no record
 | 
			
		||||
        # of existing brokerd we need to broadcast for one.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -359,9 +424,11 @@ async def _aio_get_client(
 | 
			
		|||
 | 
			
		||||
        ib = NonShittyIB()
 | 
			
		||||
        ports = _try_ports if port is None else [port]
 | 
			
		||||
 | 
			
		||||
        _err = None
 | 
			
		||||
        for port in ports:
 | 
			
		||||
            try:
 | 
			
		||||
                log.info(f"Connecting to the EYEBEE on port {port}!")
 | 
			
		||||
                await ib.connectAsync(host, port, clientId=client_id)
 | 
			
		||||
                break
 | 
			
		||||
            except ConnectionRefusedError as ce:
 | 
			
		||||
| 
						 | 
				
			
			@ -373,6 +440,7 @@ async def _aio_get_client(
 | 
			
		|||
        try:
 | 
			
		||||
            client = Client(ib)
 | 
			
		||||
            _client_cache[(host, port)] = client
 | 
			
		||||
            log.debug(f"Caching client for {(host, port)}")
 | 
			
		||||
            yield client
 | 
			
		||||
        except BaseException:
 | 
			
		||||
            ib.disconnect()
 | 
			
		||||
| 
						 | 
				
			
			@ -385,7 +453,6 @@ async def _aio_run_client_method(
 | 
			
		|||
    from_trio=None,
 | 
			
		||||
    **kwargs,
 | 
			
		||||
) -> None:
 | 
			
		||||
    log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
 | 
			
		||||
    async with _aio_get_client() as client:
 | 
			
		||||
 | 
			
		||||
        async_meth = getattr(client, meth)
 | 
			
		||||
| 
						 | 
				
			
			@ -402,6 +469,9 @@ async def _trio_run_client_method(
 | 
			
		|||
    method: str,
 | 
			
		||||
    **kwargs,
 | 
			
		||||
) -> None:
 | 
			
		||||
    """Asyncio entry point to run tasks against the ``ib_insync`` api.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    ca = tractor.current_actor()
 | 
			
		||||
    assert ca.is_infected_aio()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -530,18 +600,60 @@ def normalize(
 | 
			
		|||
_local_buffer_writers = {}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@contextmanager
 | 
			
		||||
def activate_writer(key: str):
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def activate_writer(key: str) -> (bool, trio.Nursery):
 | 
			
		||||
    try:
 | 
			
		||||
        writer_already_exists = _local_buffer_writers.get(key, False)
 | 
			
		||||
 | 
			
		||||
        if not writer_already_exists:
 | 
			
		||||
            _local_buffer_writers[key] = True
 | 
			
		||||
 | 
			
		||||
        yield writer_already_exists
 | 
			
		||||
            async with trio.open_nursery() as n:
 | 
			
		||||
                yield writer_already_exists, n
 | 
			
		||||
        else:
 | 
			
		||||
            yield writer_already_exists, None
 | 
			
		||||
    finally:
 | 
			
		||||
        _local_buffer_writers.pop(key, None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def fill_bars(
 | 
			
		||||
    first_bars,
 | 
			
		||||
    shm,
 | 
			
		||||
    count: int = 21,
 | 
			
		||||
) -> None:
 | 
			
		||||
    """Fill historical bars into shared mem / storage afap.
 | 
			
		||||
 | 
			
		||||
    TODO: avoid pacing constraints:
 | 
			
		||||
    https://github.com/pikers/piker/issues/128
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    next_dt = first_bars[0].date
 | 
			
		||||
 | 
			
		||||
    i = 0
 | 
			
		||||
    while i < count:
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            bars, bars_array = await _trio_run_client_method(
 | 
			
		||||
                method='bars',
 | 
			
		||||
                symbol='.'.join(
 | 
			
		||||
                    (first_bars.contract.symbol, first_bars.contract.exchange)
 | 
			
		||||
                ),
 | 
			
		||||
                end_dt=next_dt,
 | 
			
		||||
 | 
			
		||||
            )
 | 
			
		||||
            shm.push(bars_array, prepend=True)
 | 
			
		||||
            i += 1
 | 
			
		||||
            next_dt = bars[0].date
 | 
			
		||||
 | 
			
		||||
        except RequestError as err:
 | 
			
		||||
            # TODO: retreive underlying ``ib_insync`` error~~
 | 
			
		||||
            if err.code == 162:
 | 
			
		||||
                log.exception(
 | 
			
		||||
                    "Data query rate reached: Press `ctrl-alt-f` in TWS")
 | 
			
		||||
 | 
			
		||||
                await tractor.breakpoint()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: figure out how to share quote feeds sanely despite
 | 
			
		||||
# the wacky ``ib_insync`` api.
 | 
			
		||||
# @tractor.msg.pub
 | 
			
		||||
| 
						 | 
				
			
			@ -575,7 +687,9 @@ async def stream_quotes(
 | 
			
		|||
 | 
			
		||||
        # check if a writer already is alive in a streaming task,
 | 
			
		||||
        # otherwise start one and mark it as now existing
 | 
			
		||||
        with activate_writer(shm_token['shm_name']) as writer_already_exists:
 | 
			
		||||
        async with activate_writer(
 | 
			
		||||
            shm_token['shm_name']
 | 
			
		||||
        ) as (writer_already_exists, ln):
 | 
			
		||||
 | 
			
		||||
            # maybe load historical ohlcv in to shared mem
 | 
			
		||||
            # check if shm has already been created by previous
 | 
			
		||||
| 
						 | 
				
			
			@ -588,18 +702,29 @@ async def stream_quotes(
 | 
			
		|||
                    # we are the buffer writer
 | 
			
		||||
                    readonly=False,
 | 
			
		||||
                )
 | 
			
		||||
                bars = await _trio_run_client_method(
 | 
			
		||||
 | 
			
		||||
                # async def retrieve_and_push():
 | 
			
		||||
                start = time.time()
 | 
			
		||||
 | 
			
		||||
                bars, bars_array = await _trio_run_client_method(
 | 
			
		||||
                    method='bars',
 | 
			
		||||
                    symbol=sym,
 | 
			
		||||
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                if bars is None:
 | 
			
		||||
                log.info(f"bars_array request: {time.time() - start}")
 | 
			
		||||
 | 
			
		||||
                if bars_array is None:
 | 
			
		||||
                    raise SymbolNotFound(sym)
 | 
			
		||||
 | 
			
		||||
                # write historical data to buffer
 | 
			
		||||
                shm.push(bars)
 | 
			
		||||
                shm.push(bars_array)
 | 
			
		||||
                shm_token = shm.token
 | 
			
		||||
 | 
			
		||||
                # TODO: generalize this for other brokers
 | 
			
		||||
                # start bar filler task in bg
 | 
			
		||||
                ln.start_soon(fill_bars, bars, shm)
 | 
			
		||||
 | 
			
		||||
                times = shm.array['time']
 | 
			
		||||
                delay_s = times[-1] - times[times != times[-1]][-1]
 | 
			
		||||
                subscribe_ohlc_for_increment(shm, delay_s)
 | 
			
		||||
| 
						 | 
				
			
			@ -656,6 +781,7 @@ async def stream_quotes(
 | 
			
		|||
 | 
			
		||||
            # real-time stream
 | 
			
		||||
            async for ticker in stream:
 | 
			
		||||
                # print(ticker.vwap)
 | 
			
		||||
                quote = normalize(
 | 
			
		||||
                    ticker,
 | 
			
		||||
                    calc_price=calc_price
 | 
			
		||||
| 
						 | 
				
			
			@ -674,6 +800,8 @@ async def stream_quotes(
 | 
			
		|||
                    for tick in iterticks(quote, types=('trade', 'utrade',)):
 | 
			
		||||
                        last = tick['price']
 | 
			
		||||
 | 
			
		||||
                        # print(f"{quote['symbol']}: {tick}")
 | 
			
		||||
 | 
			
		||||
                        # update last entry
 | 
			
		||||
                        # benchmarked in the 4-5 us range
 | 
			
		||||
                        o, high, low, v = shm.array[-1][
 | 
			
		||||
| 
						 | 
				
			
			@ -687,7 +815,13 @@ async def stream_quotes(
 | 
			
		|||
                            # is also the close/last trade price
 | 
			
		||||
                            o = last
 | 
			
		||||
 | 
			
		||||
                        shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = (
 | 
			
		||||
                        shm.array[[
 | 
			
		||||
                            'open',
 | 
			
		||||
                            'high',
 | 
			
		||||
                            'low',
 | 
			
		||||
                            'close',
 | 
			
		||||
                            'volume',
 | 
			
		||||
                        ]][-1] = (
 | 
			
		||||
                            o,
 | 
			
		||||
                            max(high, last),
 | 
			
		||||
                            min(low, last),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue