diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 6c9f0b0d..bba197f3 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -29,8 +29,9 @@ from functools import partial import itertools from math import isnan from typing import ( - Any, Optional, + Any, Callable, Optional, AsyncIterator, Awaitable, + Union, ) import asyncio from pprint import pformat @@ -43,6 +44,7 @@ import time import trio from trio_typing import TaskStatus import tractor +from tractor import to_asyncio from async_generator import aclosing from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails, Option @@ -58,7 +60,7 @@ import numpy as np from .. import config from ..log import get_logger, get_console_log -from .._daemon import maybe_spawn_brokerd +# from .._daemon import maybe_spawn_brokerd from ..data._source import from_df from ..data._sharedmem import ShmArray from ._util import SymbolNotFound, NoData @@ -217,11 +219,12 @@ _enters = 0 class Client: - """IB wrapped for our broker backend API. + ''' + IB wrapped for our broker backend API. Note: this client requires running inside an ``asyncio`` loop. - """ + ''' _contracts: dict[str, Contract] = {} def __init__( @@ -242,20 +245,22 @@ class Client: self, symbol: str, # EST in ISO 8601 format is required... below is EPOCH - start_dt: str = "1970-01-01T00:00:00.000000-05:00", - end_dt: str = "", + start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", + end_dt: Union[datetime, str ] = "", sample_period_s: str = 1, # ohlc sample period period_count: int = int(2e3), # <- max per 1s sample query is_paid_feed: bool = False, # placeholder ) -> list[dict[str, Any]]: - """Retreive OHLCV bars for a symbol over a range to the present. - """ + ''' + Retreive OHLCV bars for a symbol over a range to the present. + + ''' bars_kwargs = {'whatToShow': 'TRADES'} global _enters - print(f'ENTER BARS {_enters}') + print(f'ENTER BARS {_enters} @ end={end_dt}') _enters += 1 contract = await self.find_contract(symbol) @@ -984,7 +989,7 @@ async def _trio_run_client_method( # ): # kwargs['_treat_as_stream'] = True - return await tractor.to_asyncio.run_task( + return await to_asyncio.run_task( _aio_run_client_method, meth=method, client=client, @@ -992,60 +997,119 @@ async def _trio_run_client_method( ) -class _MethodProxy: +class MethodProxy: + def __init__( self, - portal: tractor.Portal + chan: to_asyncio.LinkedTaskChannel, + ) -> None: - self._portal = portal + self.chan = chan async def _run_method( self, *, meth: str = None, **kwargs + ) -> Any: - return await self._portal.run( - _trio_run_client_method, - method=meth, - **kwargs - ) + ''' + Make a ``Client`` method call by requesting through the + ``tractor.to_asyncio`` layer. + + ''' + chan = self.chan + # send through method + ``kwargs: dict`` as pair + await chan.send((meth, kwargs)) + return await chan.receive() -def get_client_proxy( +async def open_aio_client_method_relay( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, - portal: tractor.Portal, - target=Client, +) -> None: -) -> _MethodProxy: - - proxy = _MethodProxy(portal) - - # mock all remote methods - for name, method in inspect.getmembers( - target, predicate=inspect.isfunction + async with load_aio_clients() as ( + client, + clients, + accts2clients, ): - if '_' == name[0]: - continue - setattr(proxy, name, partial(proxy._run_method, meth=name)) + to_trio.send_nowait(client) - return proxy + # relay all method requests to ``asyncio``-side client and + # deliver back results + while True: + msg = await from_trio.get() + meth_name, kwargs = msg + + meth = getattr(client, meth_name) + resp = await meth(**kwargs) + + # echo the msg back + to_trio.send_nowait(resp) @acm -async def get_client( - **kwargs, -) -> Client: - """Init the ``ib_insync`` client in another actor and return - a method proxy to it. - """ - async with maybe_spawn_brokerd( - brokername='ib', - infect_asyncio=True, - **kwargs - ) as portal: - proxy_client = get_client_proxy(portal) - yield proxy_client +async def open_client_proxy() -> MethodProxy: + + try: + async with to_asyncio.open_channel_from( + open_aio_client_method_relay, + ) as (first, chan): + + assert isinstance(first, Client) + proxy = MethodProxy(chan) + + # mock all remote methods on ib ``Client``. + for name, method in inspect.getmembers( + Client, predicate=inspect.isfunction + ): + if '_' == name[0]: + continue + setattr(proxy, name, partial(proxy._run_method, meth=name)) + + yield proxy + + except RequestError as err: + code, msg = err.code, err.message + + # TODO: retreive underlying ``ib_insync`` error? + if ( + code == 162 and ( + 'HMDS query returned no data' in msg + or 'No market data permissions for' in msg + ) + ): + # these cases should not cause a task crash + log.warning(msg) + + else: + raise + + +# @acm +# async def get_client( +# **kwargs, + +# ) -> Client: +# ''' +# Init the ``ib_insync`` client in another actor and return +# a method proxy to it. + +# ''' +# async with ( +# maybe_spawn_brokerd( +# brokername='ib', +# infect_asyncio=True, +# **kwargs +# ) as portal, +# ): +# assert 0 + # TODO: the IPC via portal relay layer for when this current + # actor isn't in aio mode. + # open_client_proxy() as proxy, + # yield proxy # https://interactivebrokers.github.io/tws-api/tick_types.html @@ -1126,27 +1190,32 @@ def normalize( async def get_bars( + proxy: MethodProxy, sym: str, end_dt: str = "", ) -> (dict, np.ndarray): + ''' + Retrieve historical data from a ``trio``-side task using + a ``MethoProxy``. + ''' _err: Optional[Exception] = None - fails = 0 + bars: Optional[list] = None + for _ in range(2): try: - bars, bars_array = await _trio_run_client_method( - method='bars', + bars, bars_array = await proxy.bars( symbol=sym, end_dt=end_dt, ) - if bars_array is None: raise SymbolNotFound(sym) next_dt = bars[0].date + print(f'ib datetime {next_dt}') return (bars, bars_array, next_dt), fails @@ -1169,8 +1238,16 @@ async def get_bars( # error? # OLDER: seem to always cause throttling despite low rps - # raise err - break + # TODO: if there is not bars returned from the first + # query we need to manually calculate the next step + # back and convert to an expected datetime format. + # if not bars: + # raise + + # try to decrement start point and look further back + next_dt = bars[0].date + print(f'ib datetime {next_dt}') + continue elif 'No market data permissions for' in err.message: @@ -1197,6 +1274,41 @@ async def get_bars( # raise _err +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + async with open_client_proxy() as proxy: + + async def get_hist( + end_dt: str, + start_dt: str = '', + + ) -> tuple[np.ndarray, str]: + + out, fails = await get_bars(proxy, symbol, end_dt=end_dt) + + # TODO: add logic here to handle tradable hours and only grab + # valid bars in the range + if out == (None, None): + # could be trying to retreive bars over weekend + log.error(f"Can't grab bars starting at {end_dt}!?!?") + raise NoData(f'{end_dt}') + + bars, bars_array, next_dt = out + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + return bars_array, next_dt + + yield get_hist + + async def backfill_bars( sym: str, @@ -1219,56 +1331,60 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 ''' - if platform.system() == 'Windows': - log.warning( - 'Decreasing history query count to 4 since, windows...') - count = 4 + # async with open_history_client(sym) as proxy: + async with open_client_proxy() as proxy: - out, fails = await get_bars(sym) + if platform.system() == 'Windows': + log.warning( + 'Decreasing history query count to 4 since, windows...') + count = 4 - if out is None: - raise RuntimeError("Could not pull currrent history?!") + out, fails = await get_bars(proxy, sym) - (first_bars, bars_array, next_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 + if out is None: + raise RuntimeError("Could not pull currrent history?!") - # write historical data to buffer - shm.push(bars_array) + (first_bars, bars_array, next_dt) = out + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 - with trio.CancelScope() as cs: + # write historical data to buffer + shm.push(bars_array) - task_status.started(cs) + with trio.CancelScope() as cs: - i = 0 - while i < count: + task_status.started(cs) - out, fails = await get_bars(sym, end_dt=next_dt) + i = 0 + while i < count: - if fails is None or fails > 1: - break + out, fails = await get_bars(proxy, sym, end_dt=next_dt) - if out == (None, None): - # could be trying to retreive bars over weekend - # TODO: add logic here to handle tradable hours and only grab - # valid bars in the range - log.error(f"Can't grab bars starting at {next_dt}!?!?") - continue + if fails is None or fails > 1: + break - bars, bars_array, next_dt = out + if out == (None, None): + # could be trying to retreive bars over weekend + # TODO: add logic here to handle tradable hours and + # only grab valid bars in the range + log.error(f"Can't grab bars starting at {next_dt}!?!?") + continue - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - # TODO we should probably dig into forums to see what peeps - # think this data "means" and then use it as an indicator of - # sorts? dinkus has mentioned that $vlms for the day dont' - # match other platforms nor the summary stat tws shows in - # the monitor - it's probably worth investigating. + bars, bars_array, next_dt = out - shm.push(bars_array, prepend=True) - i += 1 + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + # TODO we should probably dig into forums to see what peeps + # think this data "means" and then use it as an indicator of + # sorts? dinkus has mentioned that $vlms for the day dont' + # match other platforms nor the summary stat tws shows in + # the monitor - it's probably worth investigating. + + shm.push(bars_array, prepend=True) + i += 1 asset_type_map = {