Add historical backfilling to ib backend

vwap_backup
Tyler Goodlet 2020-12-10 15:46:46 -05:00
parent 3ee4fe7d56
commit 6bae50ba2e
1 changed files with 160 additions and 26 deletions

View File

@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``.
"""
from contextlib import asynccontextmanager, contextmanager
from contextlib import asynccontextmanager
from dataclasses import asdict
from functools import partial
from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
import asyncio
import logging
@ -32,6 +33,7 @@ import itertools
import time
from async_generator import aclosing
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails
from ib_insync.ticker import Ticker
import ib_insync as ibis
@ -45,7 +47,7 @@ from ..data import (
maybe_spawn_brokerd,
iterticks,
attach_shm_array,
get_shm_token,
# get_shm_token,
subscribe_ohlc_for_increment,
)
from ..data._source import from_df
@ -86,6 +88,8 @@ _time_frames = {
'Y': 'OneYear',
}
_show_wap_in_history = False
# overrides to sidestep pretty questionable design decisions in
# ``ib_insync``:
@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = {
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
_enters = 0
class Client:
"""IB wrapped for our broker backend API.
@ -142,32 +148,54 @@ class Client:
self.ib = ib
self.ib.RaiseRequestErrors = True
# NOTE: the ib.client here is "throttled" to 45 rps by default
async def bars(
self,
symbol: str,
# EST in ISO 8601 format is required... below is EPOCH
start_date: str = "1970-01-01T00:00:00.000000-05:00",
time_frame: str = '1m',
count: int = int(2e3), # <- max allowed per query
is_paid_feed: bool = False,
start_dt: str = "1970-01-01T00:00:00.000000-05:00",
end_dt: str = "",
sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present.
"""
bars_kwargs = {'whatToShow': 'TRADES'}
global _enters
print(f'ENTER BARS {_enters}')
_enters += 1
contract = await self.find_contract(symbol)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime='',
# durationStr='60 S',
# durationStr='1 D',
endDateTime=end_dt,
# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
# OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs',
# durationStr='{count} S'.format(count=15000 * 5),
# durationStr='{count} D'.format(count=1),
# barSizeSetting='5 secs',
durationStr='{count} S'.format(count=period_count),
barSizeSetting='1 secs',
# barSizeSetting='1 min',
# time length calcs
durationStr='{count} S'.format(count=5000 * 5),
barSizeSetting='5 secs',
# always use extended hours
useRTH=False,
@ -181,9 +209,13 @@ class Client:
# TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?")
# TODO: rewrite this faster with ``numba``
# convert to pandas dataframe:
df = ibis.util.df(bars)
return from_df(df)
return bars, from_df(df)
def onError(self, reqId, errorCode, errorString, contract) -> None:
breakpoint()
async def search_stocks(
self,
@ -237,6 +269,8 @@ class Client:
"""Get an unqualifed contract for the current "continous" future.
"""
contcon = ibis.ContFuture(symbol, exchange=exchange)
# it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId)
@ -279,10 +313,10 @@ class Client:
if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD'
if exch in ('PURE',):
if exch in ('PURE', 'TSE'):
# stupid ib...
primaryExchange = exch
exch = 'SMART'
primaryExchange = 'PURE'
con = ibis.Stock(
symbol=sym,
@ -293,10 +327,27 @@ class Client:
try:
exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0]
head = await self.get_head_time(contract)
print(head)
except IndexError:
raise ValueError(f"No contract could be found {con}")
return contract
async def get_head_time(
self,
contract: Contract,
) -> datetime:
"""Return the first datetime stamp for ``contract``.
"""
return await self.ib.reqHeadTimeStampAsync(
contract,
whatToShow='TRADES',
useRTH=False,
formatDate=2, # timezone aware UTC datetime
)
async def stream_ticker(
self,
symbol: str,
@ -309,7 +360,13 @@ class Client:
contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
def push(t):
"""Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
@ -346,9 +403,17 @@ async def _aio_get_client(
"""
# first check cache for existing client
# breakpoint()
try:
yield _client_cache[(host, port)]
except KeyError:
if port:
client = _client_cache[(host, port)]
else:
# grab first cached client
client = list(_client_cache.values())[0]
yield client
except (KeyError, IndexError):
# TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one.
@ -359,9 +424,11 @@ async def _aio_get_client(
ib = NonShittyIB()
ports = _try_ports if port is None else [port]
_err = None
for port in ports:
try:
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id)
break
except ConnectionRefusedError as ce:
@ -373,6 +440,7 @@ async def _aio_get_client(
try:
client = Client(ib)
_client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}")
yield client
except BaseException:
ib.disconnect()
@ -385,7 +453,6 @@ async def _aio_run_client_method(
from_trio=None,
**kwargs,
) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client:
async_meth = getattr(client, meth)
@ -402,6 +469,9 @@ async def _trio_run_client_method(
method: str,
**kwargs,
) -> None:
"""Asyncio entry point to run tasks against the ``ib_insync`` api.
"""
ca = tractor.current_actor()
assert ca.is_infected_aio()
@ -530,18 +600,60 @@ def normalize(
_local_buffer_writers = {}
@contextmanager
def activate_writer(key: str):
@asynccontextmanager
async def activate_writer(key: str) -> (bool, trio.Nursery):
try:
writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists:
_local_buffer_writers[key] = True
yield writer_already_exists
async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally:
_local_buffer_writers.pop(key, None)
async def fill_bars(
first_bars,
shm,
count: int = 21,
) -> None:
"""Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
"""
next_dt = first_bars[0].date
i = 0
while i < count:
try:
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol='.'.join(
(first_bars.contract.symbol, first_bars.contract.exchange)
),
end_dt=next_dt,
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars[0].date
except RequestError as err:
# TODO: retreive underlying ``ib_insync`` error~~
if err.code == 162:
log.exception(
"Data query rate reached: Press `ctrl-alt-f` in TWS")
await tractor.breakpoint()
# TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api.
# @tractor.msg.pub
@ -575,7 +687,9 @@ async def stream_quotes(
# check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing
with activate_writer(shm_token['shm_name']) as writer_already_exists:
async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
@ -588,18 +702,29 @@ async def stream_quotes(
# we are the buffer writer
readonly=False,
)
bars = await _trio_run_client_method(
# async def retrieve_and_push():
start = time.time()
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
)
if bars is None:
log.info(f"bars_array request: {time.time() - start}")
if bars_array is None:
raise SymbolNotFound(sym)
# write historical data to buffer
shm.push(bars)
shm.push(bars_array)
shm_token = shm.token
# TODO: generalize this for other brokers
# start bar filler task in bg
ln.start_soon(fill_bars, bars, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
@ -656,6 +781,7 @@ async def stream_quotes(
# real-time stream
async for ticker in stream:
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
@ -674,6 +800,8 @@ async def stream_quotes(
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
@ -687,7 +815,13 @@ async def stream_quotes(
# is also the close/last trade price
o = last
shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = (
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),