Add better contract search/lookup

Add a `Client.find_contract()` which internally takes
a <symbol>.<exchange> str as input and uses `IB.qualifyContractsAsync()`
internally to try and validate the most likely contract. Make the module
script call this using `asyncio.run()` for console testing.
ib_backend
Tyler Goodlet 2020-07-07 10:33:47 -04:00
parent b8209cd506
commit 450a39ce1c
1 changed files with 78 additions and 45 deletions

View File

@ -24,9 +24,16 @@ from ..log import get_logger, get_console_log
log = get_logger(__name__) log = get_logger(__name__)
_time_units = {
's': ' sec',
'm': ' mins',
'h': ' hours',
}
_time_frames = { _time_frames = {
'1s': '1 Sec', '1s': '1 Sec',
'5s': '5 Sec',
'30s': '30 Sec',
'1m': 'OneMinute', '1m': 'OneMinute',
'2m': 'TwoMinutes', '2m': 'TwoMinutes',
'3m': 'ThreeMinutes', '3m': 'ThreeMinutes',
@ -63,33 +70,31 @@ class Client:
# EST in ISO 8601 format is required... below is EPOCH # EST in ISO 8601 format is required... below is EPOCH
start_date: str = "1970-01-01T00:00:00.000000-05:00", start_date: str = "1970-01-01T00:00:00.000000-05:00",
time_frame: str = '1m', time_frame: str = '1m',
count: int = int(20e3), # <- max allowed per query count: int = int(2e3), # <- max allowed per query
is_paid_feed: bool = False, is_paid_feed: bool = False,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present. """Retreive OHLCV bars for a symbol over a range to the present.
""" """
contract = ibis.ContFuture('ES', exchange='GLOBEX') contract = await self.find_contract(symbol)
# contract = ibis.Stock('WEED', 'SMART', 'CAD') # _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync( bars = await self.ib.reqHistoricalDataAsync(
contract, contract,
endDateTime='', endDateTime='',
# durationStr='60 S', # durationStr='60 S',
durationStr='2000 S', # durationStr='1 D',
barSizeSetting='1 secs', durationStr='{count} S'.format(count=3000*5),
barSizeSetting='5 secs',
whatToShow='TRADES', whatToShow='TRADES',
useRTH=False useRTH=False
) )
assert bars
# barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True) # barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True)
# convert to pandas dataframe: # convert to pandas dataframe:
df = ibis.util.df(bars) df = ibis.util.df(bars)
print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) # print(df[['date', 'open', 'high', 'low', 'close', 'volume']])
from piker.ui._source import from_df from piker.ui._source import from_df
a = from_df(df) return from_df(df)
# breakpoint()
print(a)
# TODO: reimplement this using async batch requests
# see https://github.com/erdewit/ib_insync/issues/262
async def search_stocks( async def search_stocks(
self, self,
pattern: str, pattern: str,
@ -134,11 +139,41 @@ class Client:
) -> Dict[str, ContractDetails]: ) -> Dict[str, ContractDetails]:
raise NotImplementedError raise NotImplementedError
def get_cont_fute( async def get_cont_fute(
self, self,
symbol: str, symbol: str,
exchange: str,
) -> Contract: ) -> Contract:
raise NotImplementedError """Get an unqualifed contract for the current "continous" future.
"""
contcon = ibis.ContFuture(symbol, exchange=exchange)
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId)
async def find_contract(
self,
symbol,
currency: str = 'USD',
**kwargs,
) -> Contract:
# use heuristics to figure out contract "type"
sym, exch = symbol.upper().split('.')
if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'):
con = await self.get_cont_fute(symbol=sym, exchange=exch)
elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY
con = ibis.Commodity(symbol=sym)
else:
con = ibis.Stock(symbol=sym, exchange=exch, currency=currency)
try:
exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0]
except IndexError:
raise ValueError(f"No contract could be found {con}")
return contract
async def stream_ticker( async def stream_ticker(
self, self,
@ -148,8 +183,7 @@ class Client:
) -> None: ) -> None:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
""" """
sym, exch = symbol.split('.') contract = await self.find_contract(symbol)
contract = ibis.Stock(sym.upper(), exchange=exch.upper())
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))
@ -160,6 +194,8 @@ class Client:
# default config ports # default config ports
_tws_port: int = 7497 _tws_port: int = 7497
_gw_port: int = 4002 _gw_port: int = 4002
# list of ports to try in order
_try_ports = [_tws_port, _gw_port]
@asynccontextmanager @asynccontextmanager
@ -171,21 +207,15 @@ async def _aio_get_client(
"""Return an ``ib_insync.IB`` instance wrapped in our client API. """Return an ``ib_insync.IB`` instance wrapped in our client API.
""" """
ib = ibis.IB() ib = ibis.IB()
ports = _try_ports if port is None else [port]
if port is None:
ports = [_tws_port, _gw_port]
else:
ports = [port]
_err = None _err = None
# try all default ports
for port in ports: for port in ports:
try: try:
await ib.connectAsync(host, port, clientId=client_id) await ib.connectAsync(host, port, clientId=client_id)
break break
except ConnectionRefusedError as ce: except ConnectionRefusedError as ce:
_err = ce _err = ce
print(f'failed to connect on {port}') log.warning(f'Failed to connect on {port}')
else: else:
raise ConnectionRefusedError(_err) raise ConnectionRefusedError(_err)
@ -198,8 +228,8 @@ async def _aio_get_client(
async def _aio_run_client_method( async def _aio_run_client_method(
meth: str, meth: str,
to_trio, to_trio=None,
from_trio, from_trio=None,
**kwargs, **kwargs,
) -> None: ) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!") log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
@ -209,7 +239,7 @@ async def _aio_run_client_method(
# handle streaming methods # handle streaming methods
args = tuple(inspect.getfullargspec(async_meth).args) args = tuple(inspect.getfullargspec(async_meth).args)
if 'to_trio' in args: if to_trio and 'to_trio' in args:
kwargs['to_trio'] = to_trio kwargs['to_trio'] = to_trio
return await async_meth(**kwargs) return await async_meth(**kwargs)
@ -222,13 +252,13 @@ async def _trio_run_client_method(
ca = tractor.current_actor() ca = tractor.current_actor()
assert ca.is_infected_aio() assert ca.is_infected_aio()
# if the method is an async gen stream for it # if the method is an *async gen* stream for it
meth = getattr(Client, method) meth = getattr(Client, method)
if inspect.isasyncgenfunction(meth): if inspect.isasyncgenfunction(meth):
kwargs['_treat_as_stream'] = True kwargs['_treat_as_stream'] = True
# if the method is an async func but streams back results # if the method is an *async func* but manually
# make sure to also stream from it # streams back results, make sure to also stream it
args = tuple(inspect.getfullargspec(meth).args) args = tuple(inspect.getfullargspec(meth).args)
if 'to_trio' in args: if 'to_trio' in args:
kwargs['_treat_as_stream'] = True kwargs['_treat_as_stream'] = True
@ -241,7 +271,7 @@ async def _trio_run_client_method(
return result return result
def get_method_proxy(portal): def get_method_proxy(portal, target):
class MethodProxy: class MethodProxy:
def __init__(self, portal: tractor._portal.Portal): def __init__(self, portal: tractor._portal.Portal):
@ -264,7 +294,7 @@ def get_method_proxy(portal):
# mock all remote methods # mock all remote methods
for name, method in inspect.getmembers( for name, method in inspect.getmembers(
Client, predicate=inspect.isfunction target, predicate=inspect.isfunction
): ):
if '_' == name[0]: if '_' == name[0]:
continue continue
@ -278,8 +308,11 @@ async def maybe_spawn_brokerd(
**kwargs, **kwargs,
) -> tractor._portal.Portal: ) -> tractor._portal.Portal:
async with tractor.find_actor('brokerd_ib') as portal: async with tractor.find_actor('brokerd_ib') as portal:
if portal is None: # no broker daemon created yet # WTF: why doesn't this work?
print(__name__)
if portal is not None:
yield portal
else: # no broker daemon created yet
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
# XXX: this needs to somehow be hidden # XXX: this needs to somehow be hidden
portal = await n.start_actor( portal = await n.start_actor(
@ -305,15 +338,12 @@ async def get_client(
a method proxy to it. a method proxy to it.
""" """
async with maybe_spawn_brokerd(**kwargs) as portal: async with maybe_spawn_brokerd(**kwargs) as portal:
yield get_method_proxy(portal) yield get_method_proxy(portal, Client)
async def trio_stream_ticker(sym): async def trio_stream_ticker(sym):
get_console_log('info') get_console_log('info')
# con_es = ibis.ContFuture('ES', exchange='GLOBEX')
# es = ibis.Future('ES', '20200918', exchange='GLOBEX')
stream = await tractor.to_asyncio.run_task( stream = await tractor.to_asyncio.run_task(
_trio_run_client_method, _trio_run_client_method,
method='stream_ticker', method='stream_ticker',
@ -321,14 +351,18 @@ async def trio_stream_ticker(sym):
) )
async with aclosing(stream): async with aclosing(stream):
async for ticker in stream: async for ticker in stream:
lft = ticker.lastFillTime # TODO: validate this value
lft = ticker.rtTime
for tick_data in ticker.ticks: for tick_data in ticker.ticks:
value = tick_data._asdict() value = tick_data._asdict()
now = time.time() now = time.time()
value['time'] = now value['time'] = now
value['last_fill_time'] = lft value['last_fill_time'] = lft
if lft: if lft:
# convert from milliseconds
lft = float(lft) / 1000.
value['latency'] = now - lft value['latency'] = now - lft
yield value yield value
@ -346,13 +380,12 @@ async def stream_from_brokerd(sym):
if __name__ == '__main__': if __name__ == '__main__':
import sys import sys
sym = sys.argv[1] sym = sys.argv[1]
tractor.run( contract = asyncio.run(
stream_from_brokerd, _aio_run_client_method(
sym, 'find_contract',
# XXX: must be multiprocessing symbol=sym,
start_method='forkserver', )
loglevel='info'
) )
print(contract)