diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 0e200c9f..06ab8774 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1,16 +1,29 @@ """ Interactive Brokers API backend. + +Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is +built on it) and thus actor aware apis must be spawned with +``infected_aio==True``. """ import asyncio from dataclasses import asdict -from typing import List, Dict, Any +from functools import partial +import inspect +from typing import List, Dict, Any, Tuple from contextlib import asynccontextmanager +import time -import trio +import tractor +from async_generator import aclosing import ib_insync as ibis from ib_insync.ticker import Ticker from ib_insync.contract import Contract, ContractDetails +from ..log import get_logger, get_console_log + + +log = get_logger(__name__) + _time_frames = { '1s': '1 Sec', @@ -35,14 +48,14 @@ _time_frames = { class Client: """IB wrapped for our broker backend API. + + Note: this client requires running inside an ``asyncio`` loop. """ def __init__( self, ib: ibis.IB, ) -> None: self.ib = ib - # connect data feed callback... - self.ib.pendingTickersEvent.connect(self.on_tickers) async def bars( self, @@ -57,7 +70,7 @@ class Client: """ contract = ibis.ContFuture('ES', exchange='GLOBEX') # contract = ibis.Stock('WEED', 'SMART', 'CAD') - bars = self.ib.reqHistoricalData( + bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime='', # durationStr='60 S', @@ -88,16 +101,25 @@ class Client: Return a dictionary of ``upto`` entries worth of contract details. """ - descriptions = self.ib.reqMatchingSymbols(pattern) + descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) + + futs = [] + for d in descriptions: + con = d.contract + futs.append(self.ib.reqContractDetailsAsync(con)) + + # batch request all details + results = await asyncio.gather(*futs) + + # XXX: if there is more then one entry in the details list details = {} - for description in descriptions: - con = description.contract - deats = self.ib.reqContractDetails(con) - # XXX: if there is more then one entry in the details list + for details_set in results: # then the contract is so called "ambiguous". - for d in deats: + for d in details_set: + con = d.contract unique_sym = f'{con.symbol}.{con.primaryExchange}' details[unique_sym] = asdict(d) if asdicts else d + if len(details) == upto: return details @@ -118,6 +140,22 @@ class Client: ) -> Contract: raise NotImplementedError + async def stream_ticker( + self, + symbol: str, + to_trio, + opts: Tuple[int] = ('233', '375'), + ) -> None: + """Stream a ticker using the std L1 api. + """ + sym, exch = symbol.split('.') + contract = ibis.Stock(sym.upper(), exchange=exch.upper()) + ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) + ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) + + # let the engine run and stream + await self.ib.disconnectedEvent + # default config ports _tws_port: int = 7497 @@ -125,7 +163,7 @@ _gw_port: int = 4002 @asynccontextmanager -async def get_client( +async def _aio_get_client( host: str = '127.0.0.1', port: int = None, client_id: int = 1, @@ -133,8 +171,7 @@ async def get_client( """Return an ``ib_insync.IB`` instance wrapped in our client API. """ ib = ibis.IB() - # TODO: some detection magic to figure out if tws vs. the - # gateway is up ad choose the appropriate port + if port is None: ports = [_tws_port, _gw_port] else: @@ -152,91 +189,170 @@ async def get_client( else: raise ConnectionRefusedError(_err) - yield Client(ib) - ib.disconnect() + try: + yield Client(ib) + except BaseException: + ib.disconnect() + raise + + +async def _aio_run_client_method( + meth: str, + to_trio, + from_trio, + **kwargs, +) -> None: + log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!") + async with _aio_get_client() as client: + + async_meth = getattr(client, meth) + + # handle streaming methods + args = tuple(inspect.getfullargspec(async_meth).args) + if 'to_trio' in args: + kwargs['to_trio'] = to_trio + + return await async_meth(**kwargs) + + +async def _trio_run_client_method( + method: str, + **kwargs, +) -> None: + ca = tractor.current_actor() + assert ca.is_infected_aio() + + # if the method is an async gen stream for it + meth = getattr(Client, method) + if inspect.isasyncgenfunction(meth): + kwargs['_treat_as_stream'] = True + + # if the method is an async func but streams back results + # make sure to also stream from it + args = tuple(inspect.getfullargspec(meth).args) + if 'to_trio' in args: + kwargs['_treat_as_stream'] = True + + result = await tractor.to_asyncio.run_task( + _aio_run_client_method, + meth=method, + **kwargs + ) + return result + + +def get_method_proxy(portal): + + class MethodProxy: + def __init__(self, portal: tractor._portal.Portal): + self._portal = portal + + async def _run_method( + self, + *, + meth: str = None, + **kwargs + ) -> Any: + return await self._portal.run( + __name__, + '_trio_run_client_method', + method=meth, + **kwargs + ) + + proxy = MethodProxy(portal) + + # mock all remote methods + for name, method in inspect.getmembers( + Client, predicate=inspect.isfunction + ): + if '_' == name[0]: + continue + setattr(proxy, name, partial(proxy._run_method, meth=name)) + + return proxy + + +@asynccontextmanager +async def maybe_spawn_brokerd( + **kwargs, +) -> tractor._portal.Portal: + async with tractor.find_actor('brokerd_ib') as portal: + if portal is None: # no broker daemon created yet + + async with tractor.open_nursery() as n: + # XXX: this needs to somehow be hidden + portal = await n.start_actor( + 'brokerd_ib', + rpc_module_paths=[__name__], + infect_asyncio=True, + ) + async with tractor.wait_for_actor( + 'brokerd_ib' + ) as portal: + yield portal + + # client code may block indefinitely so cancel when + # teardown is invoked + await n.cancel() + + +@asynccontextmanager +async def get_client( + **kwargs, +) -> Client: + """Init the ``ib_insync`` client in another actor and return + a method proxy to it. + """ + async with maybe_spawn_brokerd(**kwargs) as portal: + yield get_method_proxy(portal) + + +async def trio_stream_ticker(sym): + get_console_log('info') + + # con_es = ibis.ContFuture('ES', exchange='GLOBEX') + # es = ibis.Future('ES', '20200918', exchange='GLOBEX') + + stream = await tractor.to_asyncio.run_task( + _trio_run_client_method, + method='stream_ticker', + symbol=sym, + ) + async with aclosing(stream): + async for ticker in stream: + lft = ticker.lastFillTime + for tick_data in ticker.ticks: + value = tick_data._asdict() + now = time.time() + value['time'] = now + value['last_fill_time'] = lft + if lft: + value['latency'] = now - lft + yield value + + +async def stream_from_brokerd(sym): + + async with maybe_spawn_brokerd() as portal: + stream = await portal.run( + __name__, + 'trio_stream_ticker', + sym=sym, + ) + async for tick in stream: + print(f"trio got: {tick}") if __name__ == '__main__': + import sys - con_es = ibis.ContFuture('ES', exchange='GLOBEX') - es = ibis.Future('ES', '20200918', exchange='GLOBEX') - spy = ibis.Stock('SPY', exchange='ARCA') + sym = sys.argv[1] - # ticker = client.ib.reqTickByTickData( - # contract, - # tickType='Last', - # numberOfTicks=1, - # ) - # client.ib.reqTickByTickData( - # contract, - # tickType='AllLast', - # numberOfTicks=1, - # ) - # client.ib.reqTickByTickData( - # contract, - # tickType='BidAsk', - # numberOfTicks=1, - # ) - - # ITC (inter task comms) - from_trio = asyncio.Queue() - to_trio, from_aio = trio.open_memory_channel(float("inf")) - - async def start_ib(from_trio, to_trio): - print("starting the EYEEEEBEEEEE GATEWAYYYYYYY!") - async with get_client() as client: - - # stream ticks to trio task - def ontick(ticker: Ticker): - for t in ticker.ticks: - # send tick data to trio - to_trio.send_nowait(t) - - ticker = client.ib.reqMktData(spy, '588', False, False, None) - ticker.updateEvent += ontick - - n = await from_trio.get() - assert n == 0 - - # sleep and let the engine run - await asyncio.sleep(float('inf')) - - # TODO: cmd processing from trio - # while True: - # n = await from_trio.get() - # print(f"aio got: {n}") - # to_trio.send_nowait(n + 1) - - async def trio_main(): - print("trio_main!") - - asyncio.create_task( - start_ib(from_trio, to_trio) - ) - - from_trio.put_nowait(0) - - async for tick in from_aio: - print(f"trio got: {tick}") - - # TODO: send cmds to asyncio - # from_trio.put_nowait(n + 1) - - async def aio_main(): - loop = asyncio.get_running_loop() - - trio_done_fut = asyncio.Future() - - def trio_done_callback(main_outcome): - print(f"trio_main finished: {main_outcome!r}") - trio_done_fut.set_result(main_outcome) - - trio.lowlevel.start_guest_run( - trio_main, - run_sync_soon_threadsafe=loop.call_soon_threadsafe, - done_callback=trio_done_callback, - ) - - (await trio_done_fut).unwrap() - - asyncio.run(aio_main()) + tractor.run( + stream_from_brokerd, + sym, + # XXX: must be multiprocessing + start_method='forkserver', + loglevel='info' + )