Add historical backfilling to ib backend

to_qpainterpath_and_beyond
Tyler Goodlet 2020-12-10 15:46:46 -05:00
parent 6d50ad75a7
commit fda9fcbc55
1 changed files with 160 additions and 26 deletions

View File

@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``. ``infected_aio==True``.
""" """
from contextlib import asynccontextmanager, contextmanager from contextlib import asynccontextmanager
from dataclasses import asdict from dataclasses import asdict
from functools import partial from functools import partial
from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
import asyncio import asyncio
import logging import logging
@ -32,6 +33,7 @@ import itertools
import time import time
from async_generator import aclosing from async_generator import aclosing
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails from ib_insync.contract import Contract, ContractDetails
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
import ib_insync as ibis import ib_insync as ibis
@ -45,7 +47,7 @@ from ..data import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
iterticks, iterticks,
attach_shm_array, attach_shm_array,
get_shm_token, # get_shm_token,
subscribe_ohlc_for_increment, subscribe_ohlc_for_increment,
) )
from ..data._source import from_df from ..data._source import from_df
@ -86,6 +88,8 @@ _time_frames = {
'Y': 'OneYear', 'Y': 'OneYear',
} }
_show_wap_in_history = False
# overrides to sidestep pretty questionable design decisions in # overrides to sidestep pretty questionable design decisions in
# ``ib_insync``: # ``ib_insync``:
@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = {
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
} }
_enters = 0
class Client: class Client:
"""IB wrapped for our broker backend API. """IB wrapped for our broker backend API.
@ -142,32 +148,54 @@ class Client:
self.ib = ib self.ib = ib
self.ib.RaiseRequestErrors = True self.ib.RaiseRequestErrors = True
# NOTE: the ib.client here is "throttled" to 45 rps by default
async def bars( async def bars(
self, self,
symbol: str, symbol: str,
# EST in ISO 8601 format is required... below is EPOCH # EST in ISO 8601 format is required... below is EPOCH
start_date: str = "1970-01-01T00:00:00.000000-05:00", start_dt: str = "1970-01-01T00:00:00.000000-05:00",
time_frame: str = '1m', end_dt: str = "",
count: int = int(2e3), # <- max allowed per query
is_paid_feed: bool = False, sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present. """Retreive OHLCV bars for a symbol over a range to the present.
""" """
bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs = {'whatToShow': 'TRADES'}
global _enters
print(f'ENTER BARS {_enters}')
_enters += 1
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count) # _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync( bars = await self.ib.reqHistoricalDataAsync(
contract, contract,
endDateTime='', endDateTime=end_dt,
# durationStr='60 S',
# durationStr='1 D', # time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
# OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs',
# durationStr='{count} S'.format(count=15000 * 5),
# durationStr='{count} D'.format(count=1),
# barSizeSetting='5 secs',
durationStr='{count} S'.format(count=period_count),
barSizeSetting='1 secs',
# barSizeSetting='1 min',
# time length calcs
durationStr='{count} S'.format(count=5000 * 5),
barSizeSetting='5 secs',
# always use extended hours # always use extended hours
useRTH=False, useRTH=False,
@ -181,9 +209,13 @@ class Client:
# TODO: raise underlying error here # TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?") raise ValueError(f"No bars retreived for {symbol}?")
# TODO: rewrite this faster with ``numba``
# convert to pandas dataframe: # convert to pandas dataframe:
df = ibis.util.df(bars) df = ibis.util.df(bars)
return from_df(df) return bars, from_df(df)
def onError(self, reqId, errorCode, errorString, contract) -> None:
breakpoint()
async def search_stocks( async def search_stocks(
self, self,
@ -237,6 +269,8 @@ class Client:
"""Get an unqualifed contract for the current "continous" future. """Get an unqualifed contract for the current "continous" future.
""" """
contcon = ibis.ContFuture(symbol, exchange=exchange) contcon = ibis.ContFuture(symbol, exchange=exchange)
# it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId) return ibis.Future(conId=frontcon.conId)
@ -279,10 +313,10 @@ class Client:
if exch in ('PURE', 'TSE'): # non-yankee if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD' currency = 'CAD'
if exch in ('PURE',): if exch in ('PURE', 'TSE'):
# stupid ib... # stupid ib...
primaryExchange = exch
exch = 'SMART' exch = 'SMART'
primaryExchange = 'PURE'
con = ibis.Stock( con = ibis.Stock(
symbol=sym, symbol=sym,
@ -293,10 +327,27 @@ class Client:
try: try:
exch = 'SMART' if not exch else exch exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0] contract = (await self.ib.qualifyContractsAsync(con))[0]
head = await self.get_head_time(contract)
print(head)
except IndexError: except IndexError:
raise ValueError(f"No contract could be found {con}") raise ValueError(f"No contract could be found {con}")
return contract return contract
async def get_head_time(
self,
contract: Contract,
) -> datetime:
"""Return the first datetime stamp for ``contract``.
"""
return await self.ib.reqHeadTimeStampAsync(
contract,
whatToShow='TRADES',
useRTH=False,
formatDate=2, # timezone aware UTC datetime
)
async def stream_ticker( async def stream_ticker(
self, self,
symbol: str, symbol: str,
@ -309,7 +360,13 @@ class Client:
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
def push(t): def push(t):
"""Push quotes to trio task.
"""
# log.debug(t) # log.debug(t)
try: try:
to_trio.send_nowait(t) to_trio.send_nowait(t)
@ -346,9 +403,17 @@ async def _aio_get_client(
""" """
# first check cache for existing client # first check cache for existing client
# breakpoint()
try: try:
yield _client_cache[(host, port)] if port:
except KeyError: client = _client_cache[(host, port)]
else:
# grab first cached client
client = list(_client_cache.values())[0]
yield client
except (KeyError, IndexError):
# TODO: in case the arbiter has no record # TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one. # of existing brokerd we need to broadcast for one.
@ -359,9 +424,11 @@ async def _aio_get_client(
ib = NonShittyIB() ib = NonShittyIB()
ports = _try_ports if port is None else [port] ports = _try_ports if port is None else [port]
_err = None _err = None
for port in ports: for port in ports:
try: try:
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id) await ib.connectAsync(host, port, clientId=client_id)
break break
except ConnectionRefusedError as ce: except ConnectionRefusedError as ce:
@ -373,6 +440,7 @@ async def _aio_get_client(
try: try:
client = Client(ib) client = Client(ib)
_client_cache[(host, port)] = client _client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}")
yield client yield client
except BaseException: except BaseException:
ib.disconnect() ib.disconnect()
@ -385,7 +453,6 @@ async def _aio_run_client_method(
from_trio=None, from_trio=None,
**kwargs, **kwargs,
) -> None: ) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client: async with _aio_get_client() as client:
async_meth = getattr(client, meth) async_meth = getattr(client, meth)
@ -402,6 +469,9 @@ async def _trio_run_client_method(
method: str, method: str,
**kwargs, **kwargs,
) -> None: ) -> None:
"""Asyncio entry point to run tasks against the ``ib_insync`` api.
"""
ca = tractor.current_actor() ca = tractor.current_actor()
assert ca.is_infected_aio() assert ca.is_infected_aio()
@ -530,18 +600,60 @@ def normalize(
_local_buffer_writers = {} _local_buffer_writers = {}
@contextmanager @asynccontextmanager
def activate_writer(key: str): async def activate_writer(key: str) -> (bool, trio.Nursery):
try: try:
writer_already_exists = _local_buffer_writers.get(key, False) writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists: if not writer_already_exists:
_local_buffer_writers[key] = True _local_buffer_writers[key] = True
yield writer_already_exists async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally: finally:
_local_buffer_writers.pop(key, None) _local_buffer_writers.pop(key, None)
async def fill_bars(
first_bars,
shm,
count: int = 21,
) -> None:
"""Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
"""
next_dt = first_bars[0].date
i = 0
while i < count:
try:
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol='.'.join(
(first_bars.contract.symbol, first_bars.contract.exchange)
),
end_dt=next_dt,
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars[0].date
except RequestError as err:
# TODO: retreive underlying ``ib_insync`` error~~
if err.code == 162:
log.exception(
"Data query rate reached: Press `ctrl-alt-f` in TWS")
await tractor.breakpoint()
# TODO: figure out how to share quote feeds sanely despite # TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api. # the wacky ``ib_insync`` api.
# @tractor.msg.pub # @tractor.msg.pub
@ -575,7 +687,9 @@ async def stream_quotes(
# check if a writer already is alive in a streaming task, # check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing # otherwise start one and mark it as now existing
with activate_writer(shm_token['shm_name']) as writer_already_exists: async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
# maybe load historical ohlcv in to shared mem # maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous # check if shm has already been created by previous
@ -588,18 +702,29 @@ async def stream_quotes(
# we are the buffer writer # we are the buffer writer
readonly=False, readonly=False,
) )
bars = await _trio_run_client_method(
# async def retrieve_and_push():
start = time.time()
bars, bars_array = await _trio_run_client_method(
method='bars', method='bars',
symbol=sym, symbol=sym,
) )
if bars is None: log.info(f"bars_array request: {time.time() - start}")
if bars_array is None:
raise SymbolNotFound(sym) raise SymbolNotFound(sym)
# write historical data to buffer # write historical data to buffer
shm.push(bars) shm.push(bars_array)
shm_token = shm.token shm_token = shm.token
# TODO: generalize this for other brokers
# start bar filler task in bg
ln.start_soon(fill_bars, bars, shm)
times = shm.array['time'] times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s) subscribe_ohlc_for_increment(shm, delay_s)
@ -656,6 +781,7 @@ async def stream_quotes(
# real-time stream # real-time stream
async for ticker in stream: async for ticker in stream:
# print(ticker.vwap)
quote = normalize( quote = normalize(
ticker, ticker,
calc_price=calc_price calc_price=calc_price
@ -674,6 +800,8 @@ async def stream_quotes(
for tick in iterticks(quote, types=('trade', 'utrade',)): for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price'] last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry # update last entry
# benchmarked in the 4-5 us range # benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][ o, high, low, v = shm.array[-1][
@ -687,7 +815,13 @@ async def stream_quotes(
# is also the close/last trade price # is also the close/last trade price
o = last o = last
shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = ( shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o, o,
max(high, last), max(high, last),
min(low, last), min(low, last),