diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b704d371..e874a8a0 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -2,28 +2,35 @@ Interactive Brokers API backend. Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is -built on it) and thus actor aware apis must be spawned with +built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ -import asyncio +from contextlib import asynccontextmanager from dataclasses import asdict from functools import partial +from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator +import asyncio import inspect -from typing import List, Dict, Any, Tuple -from contextlib import asynccontextmanager +import itertools import time -import tractor from async_generator import aclosing -import ib_insync as ibis -from ib_insync.ticker import Ticker from ib_insync.contract import Contract, ContractDetails +from ib_insync.ticker import Ticker +import ib_insync as ibis +import tractor from ..log import get_logger, get_console_log +from ..data import maybe_spawn_brokerd +from ..ui._source import from_df log = get_logger(__name__) +# passed to ``tractor.ActorNursery.start_actor()`` +_spawn_kwargs = { + 'infect_asyncio': True, +} _time_units = { 's': ' sec', 'm': ' mins', @@ -82,17 +89,15 @@ class Client: endDateTime='', # durationStr='60 S', # durationStr='1 D', - durationStr='{count} S'.format(count=3000*5), + durationStr='{count} S'.format(count=3000 * 5), barSizeSetting='5 secs', whatToShow='TRADES', useRTH=False ) + # TODO: raise underlying error here assert bars - # barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True) # convert to pandas dataframe: df = ibis.util.df(bars) - # print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) - from piker.ui._source import from_df return from_df(df) async def search_stocks( @@ -194,7 +199,6 @@ class Client: # default config ports _tws_port: int = 7497 _gw_port: int = 4002 -# list of ports to try in order _try_ports = [_tws_port, _gw_port] @@ -202,10 +206,20 @@ _try_ports = [_tws_port, _gw_port] async def _aio_get_client( host: str = '127.0.0.1', port: int = None, - client_id: int = 1, + client_id: Optional[int] = None, ) -> Client: """Return an ``ib_insync.IB`` instance wrapped in our client API. """ + if client_id is None: + # if this is a persistent brokerd, try to allocate a new id for + # each client + try: + ss = tractor.current_actor().statespace + client_id = next(ss.setdefault('client_ids', itertools.count())) + except RuntimeError: + # tractor likely isn't running + client_id = 1 + ib = ibis.IB() ports = _try_ports if port is None else [port] _err = None @@ -271,26 +285,27 @@ async def _trio_run_client_method( return result -def get_method_proxy(portal, target): +class _MethodProxy: + def __init__(self, portal: tractor._portal.Portal): + self._portal = portal - class MethodProxy: - def __init__(self, portal: tractor._portal.Portal): - self._portal = portal - - async def _run_method( - self, - *, - meth: str = None, + async def _run_method( + self, + *, + meth: str = None, + **kwargs + ) -> Any: + return await self._portal.run( + __name__, + '_trio_run_client_method', + method=meth, **kwargs - ) -> Any: - return await self._portal.run( - __name__, - '_trio_run_client_method', - method=meth, - **kwargs - ) + ) - proxy = MethodProxy(portal) + +def get_method_proxy(portal, target) -> _MethodProxy: + + proxy = _MethodProxy(portal) # mock all remote methods for name, method in inspect.getmembers( @@ -303,33 +318,6 @@ def get_method_proxy(portal, target): return proxy -@asynccontextmanager -async def maybe_spawn_brokerd( - **kwargs, -) -> tractor._portal.Portal: - async with tractor.find_actor('brokerd_ib') as portal: - # WTF: why doesn't this work? - print(__name__) - if portal is not None: - yield portal - else: # no broker daemon created yet - async with tractor.open_nursery() as n: - # XXX: this needs to somehow be hidden - portal = await n.start_actor( - 'brokerd_ib', - rpc_module_paths=[__name__], - infect_asyncio=True, - ) - async with tractor.wait_for_actor( - 'brokerd_ib' - ) as portal: - yield portal - - # client code may block indefinitely so cancel when - # teardown is invoked - await n.cancel() - - @asynccontextmanager async def get_client( **kwargs, @@ -337,45 +325,47 @@ async def get_client( """Init the ``ib_insync`` client in another actor and return a method proxy to it. """ - async with maybe_spawn_brokerd(**kwargs) as portal: + async with maybe_spawn_brokerd( + brokername='ib', + expose_mods=[__name__], + infect_asyncio=True, + **kwargs + ) as portal: yield get_method_proxy(portal, Client) -async def trio_stream_ticker(sym): +async def stream_quotes( + symbols: List[str], +) -> AsyncGenerator[str, Dict[str, Any]]: + """Stream symbol quotes. + + This is a ``trio`` callable routine meant to be invoked + once the brokerd is up. + """ get_console_log('info') stream = await tractor.to_asyncio.run_task( _trio_run_client_method, method='stream_ticker', - symbol=sym, + symbol=symbols[0], ) async with aclosing(stream): async for ticker in stream: - # TODO: validate this value - lft = ticker.rtTime - for tick_data in ticker.ticks: - value = tick_data._asdict() - now = time.time() - value['time'] = now - value['last_fill_time'] = lft - if lft: - # convert from milliseconds - lft = float(lft) / 1000. - value['latency'] = now - lft + # convert named tuples to dicts so we send usable keys + # for tick_data in ticker.ticks: + ticker.ticks = [td._asdict() for td in ticker.ticks] - yield value + data = asdict(ticker) + # add time stamps for downstream latency measurements + data['brokerd_ts'] = time.time() + if ticker.rtTime: + data['rtTime_s'] = float(ticker.rtTime) / 1000. -async def stream_from_brokerd(sym): + yield data - async with maybe_spawn_brokerd() as portal: - stream = await portal.run( - __name__, - 'trio_stream_ticker', - sym=sym, - ) - async for tick in stream: - print(f"trio got: {tick}") + # ugh, clear ticks since we've consumed them + ticker.ticks = [] if __name__ == '__main__':