Port to new streaming api, yield whole tickers
							parent
							
								
									450a39ce1c
								
							
						
					
					
						commit
						41c6517a23
					
				|  | @ -2,28 +2,35 @@ | |||
| Interactive Brokers API backend. | ||||
| 
 | ||||
| Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is | ||||
| built on it) and thus actor aware apis must be spawned with | ||||
| built on it) and thus actor aware API calls must be spawned with | ||||
| ``infected_aio==True``. | ||||
| """ | ||||
| import asyncio | ||||
| from contextlib import asynccontextmanager | ||||
| from dataclasses import asdict | ||||
| from functools import partial | ||||
| from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator | ||||
| import asyncio | ||||
| import inspect | ||||
| from typing import List, Dict, Any, Tuple | ||||
| from contextlib import asynccontextmanager | ||||
| import itertools | ||||
| import time | ||||
| 
 | ||||
| import tractor | ||||
| from async_generator import aclosing | ||||
| import ib_insync as ibis | ||||
| from ib_insync.ticker import Ticker | ||||
| from ib_insync.contract import Contract, ContractDetails | ||||
| from ib_insync.ticker import Ticker | ||||
| import ib_insync as ibis | ||||
| import tractor | ||||
| 
 | ||||
| from ..log import get_logger, get_console_log | ||||
| from ..data import maybe_spawn_brokerd | ||||
| from ..ui._source import from_df | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| # passed to ``tractor.ActorNursery.start_actor()`` | ||||
| _spawn_kwargs = { | ||||
|     'infect_asyncio': True, | ||||
| } | ||||
| _time_units = { | ||||
|     's': ' sec', | ||||
|     'm': ' mins', | ||||
|  | @ -82,17 +89,15 @@ class Client: | |||
|             endDateTime='', | ||||
|             # durationStr='60 S', | ||||
|             # durationStr='1 D', | ||||
|             durationStr='{count} S'.format(count=3000*5), | ||||
|             durationStr='{count} S'.format(count=3000 * 5), | ||||
|             barSizeSetting='5 secs', | ||||
|             whatToShow='TRADES', | ||||
|             useRTH=False | ||||
|         ) | ||||
|         # TODO: raise underlying error here | ||||
|         assert bars | ||||
|         # barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True) | ||||
|         # convert to pandas dataframe: | ||||
|         df = ibis.util.df(bars) | ||||
|         # print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) | ||||
|         from piker.ui._source import from_df | ||||
|         return from_df(df) | ||||
| 
 | ||||
|     async def search_stocks( | ||||
|  | @ -194,7 +199,6 @@ class Client: | |||
| # default config ports | ||||
| _tws_port: int = 7497 | ||||
| _gw_port: int = 4002 | ||||
| # list of ports to try in order | ||||
| _try_ports = [_tws_port, _gw_port] | ||||
| 
 | ||||
| 
 | ||||
|  | @ -202,10 +206,20 @@ _try_ports = [_tws_port, _gw_port] | |||
| async def _aio_get_client( | ||||
|     host: str = '127.0.0.1', | ||||
|     port: int = None, | ||||
|     client_id: int = 1, | ||||
|     client_id: Optional[int] = None, | ||||
| ) -> Client: | ||||
|     """Return an ``ib_insync.IB`` instance wrapped in our client API. | ||||
|     """ | ||||
|     if client_id is None: | ||||
|         # if this is a persistent brokerd, try to allocate a new id for | ||||
|         # each client | ||||
|         try: | ||||
|             ss = tractor.current_actor().statespace | ||||
|             client_id = next(ss.setdefault('client_ids', itertools.count())) | ||||
|         except RuntimeError: | ||||
|             # tractor likely isn't running | ||||
|             client_id = 1 | ||||
| 
 | ||||
|     ib = ibis.IB() | ||||
|     ports = _try_ports if port is None else [port] | ||||
|     _err = None | ||||
|  | @ -271,26 +285,27 @@ async def _trio_run_client_method( | |||
|     return result | ||||
| 
 | ||||
| 
 | ||||
| def get_method_proxy(portal, target): | ||||
| class _MethodProxy: | ||||
|     def __init__(self, portal: tractor._portal.Portal): | ||||
|         self._portal = portal | ||||
| 
 | ||||
|     class MethodProxy: | ||||
|         def __init__(self, portal: tractor._portal.Portal): | ||||
|             self._portal = portal | ||||
| 
 | ||||
|         async def _run_method( | ||||
|             self, | ||||
|             *, | ||||
|             meth: str = None, | ||||
|     async def _run_method( | ||||
|         self, | ||||
|         *, | ||||
|         meth: str = None, | ||||
|         **kwargs | ||||
|     ) -> Any: | ||||
|         return await self._portal.run( | ||||
|             __name__, | ||||
|             '_trio_run_client_method', | ||||
|             method=meth, | ||||
|             **kwargs | ||||
|         ) -> Any: | ||||
|             return await self._portal.run( | ||||
|                 __name__, | ||||
|                 '_trio_run_client_method', | ||||
|                 method=meth, | ||||
|                 **kwargs | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|     proxy = MethodProxy(portal) | ||||
| 
 | ||||
| def get_method_proxy(portal, target) -> _MethodProxy: | ||||
| 
 | ||||
|     proxy = _MethodProxy(portal) | ||||
| 
 | ||||
|     # mock all remote methods | ||||
|     for name, method in inspect.getmembers( | ||||
|  | @ -303,33 +318,6 @@ def get_method_proxy(portal, target): | |||
|     return proxy | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| async def maybe_spawn_brokerd( | ||||
|     **kwargs, | ||||
| ) -> tractor._portal.Portal: | ||||
|     async with tractor.find_actor('brokerd_ib') as portal: | ||||
|         # WTF: why doesn't this work? | ||||
|         print(__name__) | ||||
|         if portal is not None: | ||||
|             yield portal | ||||
|         else:  # no broker daemon created yet | ||||
|             async with tractor.open_nursery() as n: | ||||
|                 # XXX: this needs to somehow be hidden | ||||
|                 portal = await n.start_actor( | ||||
|                     'brokerd_ib', | ||||
|                     rpc_module_paths=[__name__], | ||||
|                     infect_asyncio=True, | ||||
|                 ) | ||||
|                 async with tractor.wait_for_actor( | ||||
|                     'brokerd_ib' | ||||
|                 ) as portal: | ||||
|                     yield portal | ||||
| 
 | ||||
|                 # client code may block indefinitely so cancel when | ||||
|                 # teardown is invoked | ||||
|                 await n.cancel() | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| async def get_client( | ||||
|     **kwargs, | ||||
|  | @ -337,45 +325,47 @@ async def get_client( | |||
|     """Init the ``ib_insync`` client in another actor and return | ||||
|     a method proxy to it. | ||||
|     """ | ||||
|     async with maybe_spawn_brokerd(**kwargs) as portal: | ||||
|     async with maybe_spawn_brokerd( | ||||
|         brokername='ib', | ||||
|         expose_mods=[__name__], | ||||
|         infect_asyncio=True, | ||||
|         **kwargs | ||||
|     ) as portal: | ||||
|         yield get_method_proxy(portal, Client) | ||||
| 
 | ||||
| 
 | ||||
| async def trio_stream_ticker(sym): | ||||
| async def stream_quotes( | ||||
|     symbols: List[str], | ||||
| ) -> AsyncGenerator[str, Dict[str, Any]]: | ||||
|     """Stream symbol quotes. | ||||
| 
 | ||||
|     This is a ``trio`` callable routine meant to be invoked | ||||
|     once the brokerd is up. | ||||
|     """ | ||||
|     get_console_log('info') | ||||
| 
 | ||||
|     stream = await tractor.to_asyncio.run_task( | ||||
|         _trio_run_client_method, | ||||
|         method='stream_ticker', | ||||
|         symbol=sym, | ||||
|         symbol=symbols[0], | ||||
|     ) | ||||
|     async with aclosing(stream): | ||||
|         async for ticker in stream: | ||||
|             # TODO: validate this value | ||||
|             lft = ticker.rtTime | ||||
|             for tick_data in ticker.ticks: | ||||
|                 value = tick_data._asdict() | ||||
|                 now = time.time() | ||||
|                 value['time'] = now | ||||
|                 value['last_fill_time'] = lft | ||||
|                 if lft: | ||||
|                     # convert from milliseconds | ||||
|                     lft = float(lft) / 1000. | ||||
|                     value['latency'] = now - lft | ||||
|             # convert named tuples to dicts so we send usable keys | ||||
|             # for tick_data in ticker.ticks: | ||||
|             ticker.ticks = [td._asdict() for td in ticker.ticks] | ||||
| 
 | ||||
|                 yield value | ||||
|             data = asdict(ticker) | ||||
| 
 | ||||
|             # add time stamps for downstream latency measurements | ||||
|             data['brokerd_ts'] = time.time() | ||||
|             if ticker.rtTime: | ||||
|                 data['rtTime_s'] = float(ticker.rtTime) / 1000. | ||||
| 
 | ||||
| async def stream_from_brokerd(sym): | ||||
|             yield data | ||||
| 
 | ||||
|     async with maybe_spawn_brokerd() as portal: | ||||
|         stream = await portal.run( | ||||
|             __name__, | ||||
|             'trio_stream_ticker', | ||||
|             sym=sym, | ||||
|         ) | ||||
|         async for tick in stream: | ||||
|             print(f"trio got: {tick}") | ||||
|             # ugh, clear ticks since we've consumed them | ||||
|             ticker.ticks = [] | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue