From 450a39ce1c49f1e1829d5a292b3db8a3a00e38f5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Jul 2020 10:33:47 -0400 Subject: [PATCH] Add better contract search/lookup Add a `Client.find_contract()` which internally takes a . str as input and uses `IB.qualifyContractsAsync()` internally to try and validate the most likely contract. Make the module script call this using `asyncio.run()` for console testing. --- piker/brokers/ib.py | 123 ++++++++++++++++++++++++++++---------------- 1 file changed, 78 insertions(+), 45 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 06ab8774..b704d371 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -24,9 +24,16 @@ from ..log import get_logger, get_console_log log = get_logger(__name__) +_time_units = { + 's': ' sec', + 'm': ' mins', + 'h': ' hours', +} _time_frames = { '1s': '1 Sec', + '5s': '5 Sec', + '30s': '30 Sec', '1m': 'OneMinute', '2m': 'TwoMinutes', '3m': 'ThreeMinutes', @@ -63,33 +70,31 @@ class Client: # EST in ISO 8601 format is required... below is EPOCH start_date: str = "1970-01-01T00:00:00.000000-05:00", time_frame: str = '1m', - count: int = int(20e3), # <- max allowed per query + count: int = int(2e3), # <- max allowed per query is_paid_feed: bool = False, ) -> List[Dict[str, Any]]: """Retreive OHLCV bars for a symbol over a range to the present. """ - contract = ibis.ContFuture('ES', exchange='GLOBEX') - # contract = ibis.Stock('WEED', 'SMART', 'CAD') + contract = await self.find_contract(symbol) + # _min = min(2000*100, count) bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime='', # durationStr='60 S', - durationStr='2000 S', - barSizeSetting='1 secs', + # durationStr='1 D', + durationStr='{count} S'.format(count=3000*5), + barSizeSetting='5 secs', whatToShow='TRADES', useRTH=False ) + assert bars # barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True) # convert to pandas dataframe: df = ibis.util.df(bars) - print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) + # print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) from piker.ui._source import from_df - a = from_df(df) - # breakpoint() - print(a) + return from_df(df) - # TODO: reimplement this using async batch requests - # see https://github.com/erdewit/ib_insync/issues/262 async def search_stocks( self, pattern: str, @@ -134,11 +139,41 @@ class Client: ) -> Dict[str, ContractDetails]: raise NotImplementedError - def get_cont_fute( + async def get_cont_fute( self, symbol: str, + exchange: str, ) -> Contract: - raise NotImplementedError + """Get an unqualifed contract for the current "continous" future. + """ + contcon = ibis.ContFuture(symbol, exchange=exchange) + frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] + return ibis.Future(conId=frontcon.conId) + + async def find_contract( + self, + symbol, + currency: str = 'USD', + **kwargs, + ) -> Contract: + # use heuristics to figure out contract "type" + sym, exch = symbol.upper().split('.') + + if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): + con = await self.get_cont_fute(symbol=sym, exchange=exch) + + elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY + con = ibis.Commodity(symbol=sym) + + else: + con = ibis.Stock(symbol=sym, exchange=exch, currency=currency) + + try: + exch = 'SMART' if not exch else exch + contract = (await self.ib.qualifyContractsAsync(con))[0] + except IndexError: + raise ValueError(f"No contract could be found {con}") + return contract async def stream_ticker( self, @@ -148,8 +183,7 @@ class Client: ) -> None: """Stream a ticker using the std L1 api. """ - sym, exch = symbol.split('.') - contract = ibis.Stock(sym.upper(), exchange=exch.upper()) + contract = await self.find_contract(symbol) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) @@ -160,6 +194,8 @@ class Client: # default config ports _tws_port: int = 7497 _gw_port: int = 4002 +# list of ports to try in order +_try_ports = [_tws_port, _gw_port] @asynccontextmanager @@ -171,21 +207,15 @@ async def _aio_get_client( """Return an ``ib_insync.IB`` instance wrapped in our client API. """ ib = ibis.IB() - - if port is None: - ports = [_tws_port, _gw_port] - else: - ports = [port] - + ports = _try_ports if port is None else [port] _err = None - # try all default ports for port in ports: try: await ib.connectAsync(host, port, clientId=client_id) break except ConnectionRefusedError as ce: _err = ce - print(f'failed to connect on {port}') + log.warning(f'Failed to connect on {port}') else: raise ConnectionRefusedError(_err) @@ -198,8 +228,8 @@ async def _aio_get_client( async def _aio_run_client_method( meth: str, - to_trio, - from_trio, + to_trio=None, + from_trio=None, **kwargs, ) -> None: log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!") @@ -209,7 +239,7 @@ async def _aio_run_client_method( # handle streaming methods args = tuple(inspect.getfullargspec(async_meth).args) - if 'to_trio' in args: + if to_trio and 'to_trio' in args: kwargs['to_trio'] = to_trio return await async_meth(**kwargs) @@ -222,13 +252,13 @@ async def _trio_run_client_method( ca = tractor.current_actor() assert ca.is_infected_aio() - # if the method is an async gen stream for it + # if the method is an *async gen* stream for it meth = getattr(Client, method) if inspect.isasyncgenfunction(meth): kwargs['_treat_as_stream'] = True - # if the method is an async func but streams back results - # make sure to also stream from it + # if the method is an *async func* but manually + # streams back results, make sure to also stream it args = tuple(inspect.getfullargspec(meth).args) if 'to_trio' in args: kwargs['_treat_as_stream'] = True @@ -241,7 +271,7 @@ async def _trio_run_client_method( return result -def get_method_proxy(portal): +def get_method_proxy(portal, target): class MethodProxy: def __init__(self, portal: tractor._portal.Portal): @@ -264,7 +294,7 @@ def get_method_proxy(portal): # mock all remote methods for name, method in inspect.getmembers( - Client, predicate=inspect.isfunction + target, predicate=inspect.isfunction ): if '_' == name[0]: continue @@ -278,8 +308,11 @@ async def maybe_spawn_brokerd( **kwargs, ) -> tractor._portal.Portal: async with tractor.find_actor('brokerd_ib') as portal: - if portal is None: # no broker daemon created yet - + # WTF: why doesn't this work? + print(__name__) + if portal is not None: + yield portal + else: # no broker daemon created yet async with tractor.open_nursery() as n: # XXX: this needs to somehow be hidden portal = await n.start_actor( @@ -305,15 +338,12 @@ async def get_client( a method proxy to it. """ async with maybe_spawn_brokerd(**kwargs) as portal: - yield get_method_proxy(portal) + yield get_method_proxy(portal, Client) async def trio_stream_ticker(sym): get_console_log('info') - # con_es = ibis.ContFuture('ES', exchange='GLOBEX') - # es = ibis.Future('ES', '20200918', exchange='GLOBEX') - stream = await tractor.to_asyncio.run_task( _trio_run_client_method, method='stream_ticker', @@ -321,14 +351,18 @@ async def trio_stream_ticker(sym): ) async with aclosing(stream): async for ticker in stream: - lft = ticker.lastFillTime + # TODO: validate this value + lft = ticker.rtTime for tick_data in ticker.ticks: value = tick_data._asdict() now = time.time() value['time'] = now value['last_fill_time'] = lft if lft: + # convert from milliseconds + lft = float(lft) / 1000. value['latency'] = now - lft + yield value @@ -346,13 +380,12 @@ async def stream_from_brokerd(sym): if __name__ == '__main__': import sys - sym = sys.argv[1] - tractor.run( - stream_from_brokerd, - sym, - # XXX: must be multiprocessing - start_method='forkserver', - loglevel='info' + contract = asyncio.run( + _aio_run_client_method( + 'find_contract', + symbol=sym, + ) ) + print(contract)