From 1abadeb5063e8bdfd14cafd9d293fc3e4ef1b019 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Jun 2020 13:17:29 -0400 Subject: [PATCH] Add initial IB broker backend using ib_insync Start working towards meeting the backend client api. Infect `asyncio` using `trio`'s new guest mode and demonstrate real-time ticker streaming to console. --- piker/brokers/ib.py | 242 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 piker/brokers/ib.py diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py new file mode 100644 index 00000000..0e200c9f --- /dev/null +++ b/piker/brokers/ib.py @@ -0,0 +1,242 @@ +""" +Interactive Brokers API backend. +""" +import asyncio +from dataclasses import asdict +from typing import List, Dict, Any +from contextlib import asynccontextmanager + +import trio +import ib_insync as ibis +from ib_insync.ticker import Ticker +from ib_insync.contract import Contract, ContractDetails + + +_time_frames = { + '1s': '1 Sec', + '1m': 'OneMinute', + '2m': 'TwoMinutes', + '3m': 'ThreeMinutes', + '4m': 'FourMinutes', + '5m': 'FiveMinutes', + '10m': 'TenMinutes', + '15m': 'FifteenMinutes', + '20m': 'TwentyMinutes', + '30m': 'HalfHour', + '1h': 'OneHour', + '2h': 'TwoHours', + '4h': 'FourHours', + 'D': 'OneDay', + 'W': 'OneWeek', + 'M': 'OneMonth', + 'Y': 'OneYear', +} + + +class Client: + """IB wrapped for our broker backend API. + """ + def __init__( + self, + ib: ibis.IB, + ) -> None: + self.ib = ib + # connect data feed callback... + self.ib.pendingTickersEvent.connect(self.on_tickers) + + async def bars( + self, + symbol: str, + # EST in ISO 8601 format is required... below is EPOCH + start_date: str = "1970-01-01T00:00:00.000000-05:00", + time_frame: str = '1m', + count: int = int(20e3), # <- max allowed per query + is_paid_feed: bool = False, + ) -> List[Dict[str, Any]]: + """Retreive OHLCV bars for a symbol over a range to the present. + """ + contract = ibis.ContFuture('ES', exchange='GLOBEX') + # contract = ibis.Stock('WEED', 'SMART', 'CAD') + bars = self.ib.reqHistoricalData( + contract, + endDateTime='', + # durationStr='60 S', + durationStr='2000 S', + barSizeSetting='1 secs', + whatToShow='TRADES', + useRTH=False + ) + # barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True) + # convert to pandas dataframe: + df = ibis.util.df(bars) + print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) + from piker.ui._source import from_df + a = from_df(df) + # breakpoint() + print(a) + + # TODO: reimplement this using async batch requests + # see https://github.com/erdewit/ib_insync/issues/262 + async def search_stocks( + self, + pattern: str, + # how many contracts to search "up to" + upto: int = 3, + asdicts: bool = True, + ) -> Dict[str, ContractDetails]: + """Search for stocks matching provided ``str`` pattern. + + Return a dictionary of ``upto`` entries worth of contract details. + """ + descriptions = self.ib.reqMatchingSymbols(pattern) + details = {} + for description in descriptions: + con = description.contract + deats = self.ib.reqContractDetails(con) + # XXX: if there is more then one entry in the details list + # then the contract is so called "ambiguous". + for d in deats: + unique_sym = f'{con.symbol}.{con.primaryExchange}' + details[unique_sym] = asdict(d) if asdicts else d + if len(details) == upto: + return details + + return details + + async def search_futes( + self, + pattern: str, + # how many contracts to search "up to" + upto: int = 3, + asdicts: bool = True, + ) -> Dict[str, ContractDetails]: + raise NotImplementedError + + def get_cont_fute( + self, + symbol: str, + ) -> Contract: + raise NotImplementedError + + +# default config ports +_tws_port: int = 7497 +_gw_port: int = 4002 + + +@asynccontextmanager +async def get_client( + host: str = '127.0.0.1', + port: int = None, + client_id: int = 1, +) -> Client: + """Return an ``ib_insync.IB`` instance wrapped in our client API. + """ + ib = ibis.IB() + # TODO: some detection magic to figure out if tws vs. the + # gateway is up ad choose the appropriate port + if port is None: + ports = [_tws_port, _gw_port] + else: + ports = [port] + + _err = None + # try all default ports + for port in ports: + try: + await ib.connectAsync(host, port, clientId=client_id) + break + except ConnectionRefusedError as ce: + _err = ce + print(f'failed to connect on {port}') + else: + raise ConnectionRefusedError(_err) + + yield Client(ib) + ib.disconnect() + + +if __name__ == '__main__': + + con_es = ibis.ContFuture('ES', exchange='GLOBEX') + es = ibis.Future('ES', '20200918', exchange='GLOBEX') + spy = ibis.Stock('SPY', exchange='ARCA') + + # ticker = client.ib.reqTickByTickData( + # contract, + # tickType='Last', + # numberOfTicks=1, + # ) + # client.ib.reqTickByTickData( + # contract, + # tickType='AllLast', + # numberOfTicks=1, + # ) + # client.ib.reqTickByTickData( + # contract, + # tickType='BidAsk', + # numberOfTicks=1, + # ) + + # ITC (inter task comms) + from_trio = asyncio.Queue() + to_trio, from_aio = trio.open_memory_channel(float("inf")) + + async def start_ib(from_trio, to_trio): + print("starting the EYEEEEBEEEEE GATEWAYYYYYYY!") + async with get_client() as client: + + # stream ticks to trio task + def ontick(ticker: Ticker): + for t in ticker.ticks: + # send tick data to trio + to_trio.send_nowait(t) + + ticker = client.ib.reqMktData(spy, '588', False, False, None) + ticker.updateEvent += ontick + + n = await from_trio.get() + assert n == 0 + + # sleep and let the engine run + await asyncio.sleep(float('inf')) + + # TODO: cmd processing from trio + # while True: + # n = await from_trio.get() + # print(f"aio got: {n}") + # to_trio.send_nowait(n + 1) + + async def trio_main(): + print("trio_main!") + + asyncio.create_task( + start_ib(from_trio, to_trio) + ) + + from_trio.put_nowait(0) + + async for tick in from_aio: + print(f"trio got: {tick}") + + # TODO: send cmds to asyncio + # from_trio.put_nowait(n + 1) + + async def aio_main(): + loop = asyncio.get_running_loop() + + trio_done_fut = asyncio.Future() + + def trio_done_callback(main_outcome): + print(f"trio_main finished: {main_outcome!r}") + trio_done_fut.set_result(main_outcome) + + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + done_callback=trio_done_callback, + ) + + (await trio_done_fut).unwrap() + + asyncio.run(aio_main())