From 1c1661b783b9a5c940fea57da882fd99fbe0cbf2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Jun 2022 15:27:05 -0400 Subject: [PATCH] Factor all data feed endpoints into `.ib.feed.py` --- piker/brokers/ib/__init__.py | 14 +- piker/brokers/ib/client.py | 898 +-------------------------------- piker/brokers/ib/feed.py | 938 +++++++++++++++++++++++++++++++++++ piker/data/feed.py | 1 + 4 files changed, 958 insertions(+), 893 deletions(-) create mode 100644 piker/brokers/ib/feed.py diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 877b0667..1d6bac33 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -32,15 +32,27 @@ Sub-modules within break into the core functionalities: """ from .client import ( get_client, + trades_dialogue, +) +from .feed import ( open_history_client, open_symbol_search, stream_quotes, - trades_dialogue, ) +__all__ = [ + 'get_client', + 'trades_dialogue', + 'open_history_client', + 'open_symbol_search', + 'stream_quotes', +] + + # tractor RPC enable arg __enable_modules__: list[str] = [ 'client', + 'feed', ] # passed to ``tractor.ActorNursery.start_actor()`` diff --git a/piker/brokers/ib/client.py b/piker/brokers/ib/client.py index cbbbd8d4..43626c04 100644 --- a/piker/brokers/ib/client.py +++ b/piker/brokers/ib/client.py @@ -28,8 +28,9 @@ from functools import partial import itertools from math import isnan from typing import ( - Any, Callable, Optional, - AsyncIterator, Awaitable, + Any, + Optional, + AsyncIterator, Union, ) import asyncio @@ -43,7 +44,6 @@ import trio from trio_typing import TaskStatus import tractor from tractor import to_asyncio -from async_generator import aclosing from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails, Option from ib_insync.order import Order, Trade, OrderStatus @@ -53,16 +53,12 @@ from ib_insync.objects import Position import ib_insync as ibis from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client -from fuzzywuzzy import process as fuzzy import numpy as np -import pendulum from piker import config from piker.log import get_logger, get_console_log from piker.data._source import base_ohlc_dtype -from piker.data._sharedmem import ShmArray -from .._util import SymbolNotFound, NoData from piker.clearing._messages import ( BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdPosition, BrokerdCancel, @@ -1151,27 +1147,6 @@ def get_preferred_data_client( ) -@acm -async def open_data_client() -> MethodProxy: - ''' - Open the first found preferred "data client" as defined in the - user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable - and deliver that client wrapped in a ``MethodProxy``. - - ''' - async with ( - open_client_proxies() as (proxies, clients), - ): - account_name, client = get_preferred_data_client(clients) - proxy = proxies.get(f'ib.{account_name}') - if not proxy: - raise ValueError( - f'No preferred data client could be found for {account_name}!' - ) - - yield proxy - - class MethodProxy: def __init__( @@ -1355,725 +1330,14 @@ async def get_client( a method proxy to it. ''' + from .feed import open_data_client + # TODO: the IPC via portal relay layer for when this current # actor isn't in aio mode. async with open_data_client() as proxy: yield proxy -# https://interactivebrokers.github.io/tws-api/tick_types.html -tick_types = { - 77: 'trade', - - # a "utrade" aka an off exchange "unreportable" (dark) vlm: - # https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume - 48: 'dark_trade', - - # standard L1 ticks - 0: 'bsize', - 1: 'bid', - 2: 'ask', - 3: 'asize', - 4: 'last', - 5: 'size', - 8: 'volume', - - # ``ib_insync`` already packs these into - # quotes under the following fields. - # 55: 'trades_per_min', # `'tradeRate'` - # 56: 'vlm_per_min', # `'volumeRate'` - # 89: 'shortable', # `'shortableShares'` -} - - -# TODO: cython/mypyc/numba this! -def normalize( - ticker: Ticker, - calc_price: bool = False - -) -> dict: - - # should be real volume for this contract by default - calc_price = False - - # check for special contract types - con = ticker.contract - if type(con) in ( - ibis.Commodity, - ibis.Forex, - ): - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = con.secType - # no real volume on this tract - calc_price = True - - else: - suffix = con.primaryExchange - if not suffix: - suffix = con.exchange - - # append a `.` to the returned symbol - # key for derivatives that normally is the expiry - # date key. - expiry = con.lastTradeDateOrContractMonth - if expiry: - suffix += f'.{expiry}' - - # convert named tuples to dicts so we send usable keys - new_ticks = [] - for tick in ticker.ticks: - if tick and not isinstance(tick, dict): - td = tick._asdict() - td['type'] = tick_types.get( - td['tickType'], - 'n/a', - ) - - new_ticks.append(td) - - tbt = ticker.tickByTicks - if tbt: - print(f'tickbyticks:\n {ticker.tickByTicks}') - - ticker.ticks = new_ticks - - # some contracts don't have volume so we may want to calculate - # a midpoint price based on data we can acquire (such as bid / ask) - if calc_price: - ticker.ticks.append( - {'type': 'trade', 'price': ticker.marketPrice()} - ) - - # serialize for transport - data = asdict(ticker) - - # generate fqsn with possible specialized suffix - # for derivatives, note the lowercase. - data['symbol'] = data['fqsn'] = '.'.join( - (con.symbol, suffix) - ).lower() - - # convert named tuples to dicts for transport - tbts = data.get('tickByTicks') - if tbts: - data['tickByTicks'] = [tbt._asdict() for tbt in tbts] - - # add time stamps for downstream latency measurements - data['brokerd_ts'] = time.time() - - # stupid stupid shit...don't even care any more.. - # leave it until we do a proper latency study - # if ticker.rtTime is not None: - # data['broker_ts'] = data['rtTime_s'] = float( - # ticker.rtTime.timestamp) / 1000. - data.pop('rtTime') - - return data - - -_pacing: str = ( - 'Historical Market Data Service error ' - 'message:Historical data request pacing violation' -) - - -async def get_bars( - - proxy: MethodProxy, - fqsn: str, - - # blank to start which tells ib to look up the latest datum - end_dt: str = '', - -) -> (dict, np.ndarray): - ''' - Retrieve historical data from a ``trio``-side task using - a ``MethoProxy``. - - ''' - fails = 0 - bars: Optional[list] = None - first_dt: datetime = None - last_dt: datetime = None - - if end_dt: - last_dt = pendulum.from_timestamp(end_dt.timestamp()) - - for _ in range(10): - try: - out = await proxy.bars( - fqsn=fqsn, - end_dt=end_dt, - ) - if out: - bars, bars_array = out - - else: - await tractor.breakpoint() - - if bars_array is None: - raise SymbolNotFound(fqsn) - - first_dt = pendulum.from_timestamp( - bars[0].date.timestamp()) - - last_dt = pendulum.from_timestamp( - bars[-1].date.timestamp()) - - time = bars_array['time'] - assert time[-1] == last_dt.timestamp() - assert time[0] == first_dt.timestamp() - log.info( - f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' - ) - - return (bars, bars_array, first_dt, last_dt), fails - - except RequestError as err: - msg = err.message - # why do we always need to rebind this? - # _err = err - - if 'No market data permissions for' in msg: - # TODO: signalling for no permissions searches - raise NoData( - f'Symbol: {fqsn}', - ) - - elif ( - err.code == 162 - and 'HMDS query returned no data' in err.message - ): - # XXX: this is now done in the storage mgmt layer - # and we shouldn't implicitly decrement the frame dt - # index since the upper layer may be doing so - # concurrently and we don't want to be delivering frames - # that weren't asked for. - log.warning( - f'NO DATA found ending @ {end_dt}\n' - ) - - # try to decrement start point and look further back - # end_dt = last_dt = last_dt.subtract(seconds=2000) - - raise NoData( - f'Symbol: {fqsn}', - frame_size=2000, - ) - - elif _pacing in msg: - - log.warning( - 'History throttle rate reached!\n' - 'Resetting farms with `ctrl-alt-f` hack\n' - ) - # TODO: we might have to put a task lock around this - # method.. - hist_ev = proxy.status_event( - 'HMDS data farm connection is OK:ushmds' - ) - - # XXX: other event messages we might want to try and - # wait for but i wasn't able to get any of this - # reliable.. - # reconnect_start = proxy.status_event( - # 'Market data farm is connecting:usfuture' - # ) - # live_ev = proxy.status_event( - # 'Market data farm connection is OK:usfuture' - # ) - - # try to wait on the reset event(s) to arrive, a timeout - # will trigger a retry up to 6 times (for now). - tries: int = 2 - timeout: float = 10 - - # try 3 time with a data reset then fail over to - # a connection reset. - for i in range(1, tries): - - log.warning('Sending DATA RESET request') - await data_reset_hack(reset_type='data') - - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") - break - - if cs.cancelled_caught: - fails += 1 - log.warning( - f'Data reset {name} timeout, retrying {i}.' - ) - - continue - else: - - log.warning('Sending CONNECTION RESET') - await data_reset_hack(reset_type='connection') - - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") - - if cs.cancelled_caught: - fails += 1 - log.warning('Data CONNECTION RESET timeout!?') - - else: - raise - - return None, None - # else: # throttle wasn't fixed so error out immediately - # raise _err - - -@acm -async def open_history_client( - symbol: str, - -) -> tuple[Callable, int]: - ''' - History retreival endpoint - delivers a historical frame callble - that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. - - ''' - async with open_data_client() as proxy: - - async def get_hist( - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, - - ) -> tuple[np.ndarray, str]: - - out, fails = await get_bars(proxy, symbol, end_dt=end_dt) - - # TODO: add logic here to handle tradable hours and only grab - # valid bars in the range - if out is None: - # could be trying to retreive bars over weekend - log.error(f"Can't grab bars starting at {end_dt}!?!?") - raise NoData( - f'{end_dt}', - frame_size=2000, - ) - - bars, bars_array, first_dt, last_dt = out - - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - - return bars_array, first_dt, last_dt - - # TODO: it seems like we can do async queries for ohlc - # but getting the order right still isn't working and I'm not - # quite sure why.. needs some tinkering and probably - # a lookthrough of the ``ib_insync`` machinery, for eg. maybe - # we have to do the batch queries on the `asyncio` side? - yield get_hist, {'erlangs': 1, 'rate': 6} - - -async def backfill_bars( - - fqsn: str, - shm: ShmArray, # type: ignore # noqa - - # TODO: we want to avoid overrunning the underlying shm array buffer - # and we should probably calc the number of calls to make depending - # on that until we have the `marketstore` daemon in place in which - # case the shm size will be driven by user config and available sys - # memory. - count: int = 16, - - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - - TODO: avoid pacing constraints: - https://github.com/pikers/piker/issues/128 - - ''' - # last_dt1 = None - last_dt = None - - with trio.CancelScope() as cs: - - async with open_data_client() as proxy: - - out, fails = await get_bars(proxy, fqsn) - - if out is None: - raise RuntimeError("Could not pull currrent history?!") - - (first_bars, bars_array, first_dt, last_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - last_dt = first_dt - - # write historical data to buffer - shm.push(bars_array) - - task_status.started(cs) - - i = 0 - while i < count: - - out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) - - if out is None: - # could be trying to retreive bars over weekend - # TODO: add logic here to handle tradable hours and - # only grab valid bars in the range - log.error(f"Can't grab bars starting at {first_dt}!?!?") - - # XXX: get_bars() should internally decrement dt by - # 2k seconds and try again. - continue - - (first_bars, bars_array, first_dt, last_dt) = out - # last_dt1 = last_dt - # last_dt = first_dt - - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - - # TODO we should probably dig into forums to see what peeps - # think this data "means" and then use it as an indicator of - # sorts? dinkus has mentioned that $vlms for the day dont' - # match other platforms nor the summary stat tws shows in - # the monitor - it's probably worth investigating. - - shm.push(bars_array, prepend=True) - i += 1 - - -asset_type_map = { - 'STK': 'stock', - 'OPT': 'option', - 'FUT': 'future', - 'CONTFUT': 'continuous_future', - 'CASH': 'forex', - 'IND': 'index', - 'CFD': 'cfd', - 'BOND': 'bond', - 'CMDTY': 'commodity', - 'FOP': 'futures_option', - 'FUND': 'mutual_fund', - 'WAR': 'warrant', - 'IOPT': 'warran', - 'BAG': 'bag', - # 'NEWS': 'news', -} - - -_quote_streams: dict[str, trio.abc.ReceiveStream] = {} - - -async def _setup_quote_stream( - - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - - symbol: str, - opts: tuple[int] = ( - '375', # RT trade volume (excludes utrades) - '233', # RT trade volume (includes utrades) - '236', # Shortable shares - - # these all appear to only be updated every 25s thus - # making them mostly useless and explains why the scanner - # is always slow XD - # '293', # Trade count for day - '294', # Trade rate / minute - '295', # Vlm rate / minute - ), - contract: Optional[Contract] = None, - -) -> trio.abc.ReceiveChannel: - ''' - Stream a ticker using the std L1 api. - - This task is ``asyncio``-side and must be called from - ``tractor.to_asyncio.open_channel_from()``. - - ''' - global _quote_streams - - to_trio.send_nowait(None) - - async with load_aio_clients() as accts2clients: - caccount_name, client = get_preferred_data_client(accts2clients) - contract = contract or (await client.find_contract(symbol)) - ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) - - # NOTE: it's batch-wise and slow af but I guess could - # be good for backchecking? Seems to be every 5s maybe? - # ticker: Ticker = client.ib.reqTickByTickData( - # contract, 'Last', - # ) - - # # define a simple queue push routine that streams quote packets - # # to trio over the ``to_trio`` memory channel. - # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore - def teardown(): - ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") - client.ib.cancelMktData(contract) - - # decouple broadcast mem chan - _quote_streams.pop(symbol, None) - - def push(t: Ticker) -> None: - """ - Push quotes to trio task. - - """ - # log.debug(t) - try: - to_trio.send_nowait(t) - - except ( - trio.BrokenResourceError, - - # XXX: HACK, not sure why this gets left stale (probably - # due to our terrible ``tractor.to_asyncio`` - # implementation for streams.. but if the mem chan - # gets left here and starts blocking just kill the feed? - # trio.WouldBlock, - ): - # XXX: eventkit's ``Event.emit()`` for whatever redic - # reason will catch and ignore regular exceptions - # resulting in tracebacks spammed to console.. - # Manually do the dereg ourselves. - teardown() - except trio.WouldBlock: - log.warning( - f'channel is blocking symbol feed for {symbol}?' - f'\n{to_trio.statistics}' - ) - - # except trio.WouldBlock: - # # for slow debugging purposes to avoid clobbering prompt - # # with log msgs - # pass - - ticker.updateEvent.connect(push) - try: - await asyncio.sleep(float('inf')) - finally: - teardown() - - # return from_aio - - -@acm -async def open_aio_quote_stream( - - symbol: str, - contract: Optional[Contract] = None, - -) -> trio.abc.ReceiveStream: - - from tractor.trionics import broadcast_receiver - global _quote_streams - - from_aio = _quote_streams.get(symbol) - if from_aio: - - # if we already have a cached feed deliver a rx side clone to consumer - async with broadcast_receiver( - from_aio, - 2**6, - ) as from_aio: - yield from_aio - return - - async with tractor.to_asyncio.open_channel_from( - _setup_quote_stream, - symbol=symbol, - contract=contract, - - ) as (first, from_aio): - - # cache feed for later consumers - _quote_streams[symbol] = from_aio - - yield from_aio - - -async def stream_quotes( - - send_chan: trio.abc.SendChannel, - symbols: list[str], - feed_is_live: trio.Event, - loglevel: str = None, - - # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Stream symbol quotes. - - This is a ``trio`` callable routine meant to be invoked - once the brokerd is up. - - ''' - # TODO: support multiple subscriptions - sym = symbols[0] - log.info(f'request for real-time quotes: {sym}') - - async with open_data_client() as proxy: - - con, first_ticker, details = await proxy.get_sym_details(symbol=sym) - first_quote = normalize(first_ticker) - # print(f'first quote: {first_quote}') - - def mk_init_msgs() -> dict[str, dict]: - ''' - Collect a bunch of meta-data useful for feed startup and - pack in a `dict`-msg. - - ''' - # pass back some symbol info like min_tick, trading_hours, etc. - syminfo = asdict(details) - syminfo.update(syminfo['contract']) - - # nested dataclass we probably don't need and that won't IPC - # serialize - syminfo.pop('secIdList') - - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - min_tick = 0.01 if atype == 'stock' else 0 - - syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - - # for "traditional" assets, volume is normally discreet, not - # a float - syminfo['lot_tick_size'] = 0.0 - - ibclient = proxy._aio_ns.ib.client - host, port = ibclient.host, ibclient.port - - # TODO: for loop through all symbols passed in - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': syminfo, - 'fqsn': first_quote['fqsn'], - }, - 'status': { - 'data_ep': f'{host}:{port}', - }, - - } - return init_msgs - - init_msgs = mk_init_msgs() - - # TODO: we should instead spawn a task that waits on a feed to start - # and let it wait indefinitely..instead of this hard coded stuff. - with trio.move_on_after(1): - contract, first_ticker, details = await proxy.get_quote(symbol=sym) - - # it might be outside regular trading hours so see if we can at - # least grab history. - if isnan(first_ticker.last): - task_status.started((init_msgs, first_quote)) - - # it's not really live but this will unblock - # the brokerd feed task to tell the ui to update? - feed_is_live.set() - - # block and let data history backfill code run. - await trio.sleep_forever() - return # we never expect feed to come up? - - async with open_aio_quote_stream( - symbol=sym, - contract=con, - ) as stream: - - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] - - task_status.started((init_msgs, first_quote)) - - async with aclosing(stream): - if type(first_ticker.contract) not in ( - ibis.Commodity, - ibis.Forex - ): - # wait for real volume on feed (trading might be closed) - while True: - ticker = await stream.receive() - - # for a real volume contract we rait for the first - # "real" trade to take place - if ( - # not calc_price - # and not ticker.rtTime - not ticker.rtTime - ): - # spin consuming tickers until we get a real - # market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] - - # XXX: this works because we don't use - # ``aclosing()`` above? - break - - quote = normalize(ticker) - log.debug(f"First ticker received {quote}") - - # tell caller quotes are now coming in live - feed_is_live.set() - - # last = time.time() - async for ticker in stream: - quote = normalize(ticker) - await send_chan.send({quote['fqsn']: quote}) - - # ugh, clear ticks since we've consumed them - ticker.ticks = [] - # last = time.time() - - def pack_position( pos: Position @@ -2461,156 +1725,6 @@ async def deliver_trade_events( await ems_stream.send(msg.dict()) -@tractor.context -async def open_symbol_search( - ctx: tractor.Context, - -) -> None: - - # TODO: load user defined symbol set locally for fast search? - await ctx.started({}) - - async with open_data_client() as proxy: - async with ctx.open_stream() as stream: - - last = time.time() - - async for pattern in stream: - log.debug(f'received {pattern}') - now = time.time() - - assert pattern, 'IB can not accept blank search pattern' - - # throttle search requests to no faster then 1Hz - diff = now - last - if diff < 1.0: - log.debug('throttle sleeping') - await trio.sleep(diff) - try: - pattern = stream.receive_nowait() - except trio.WouldBlock: - pass - - if not pattern or pattern.isspace(): - log.warning('empty pattern received, skipping..') - - # TODO: *BUG* if nothing is returned here the client - # side will cache a null set result and not showing - # anything to the use on re-searches when this query - # timed out. We probably need a special "timeout" msg - # or something... - - # XXX: this unblocks the far end search task which may - # hold up a multi-search nursery block - await stream.send({}) - - continue - - log.debug(f'searching for {pattern}') - - last = time.time() - - # async batch search using api stocks endpoint and module - # defined adhoc symbol set. - stock_results = [] - - async def stash_results(target: Awaitable[list]): - stock_results.extend(await target) - - async with trio.open_nursery() as sn: - sn.start_soon( - stash_results, - proxy.search_symbols( - pattern=pattern, - upto=5, - ), - ) - - # trigger async request - await trio.sleep(0) - - # match against our ad-hoc set immediately - adhoc_matches = fuzzy.extractBests( - pattern, - list(_adhoc_futes_set), - score_cutoff=90, - ) - log.info(f'fuzzy matched adhocs: {adhoc_matches}') - adhoc_match_results = {} - if adhoc_matches: - # TODO: do we need to pull contract details? - adhoc_match_results = {i[0]: {} for i in adhoc_matches} - - log.debug(f'fuzzy matching stocks {stock_results}') - stock_matches = fuzzy.extractBests( - pattern, - stock_results, - score_cutoff=50, - ) - - matches = adhoc_match_results | { - item[0]: {} for item in stock_matches - } - # TODO: we used to deliver contract details - # {item[2]: item[0] for item in stock_matches} - - log.debug(f"sending matches: {matches.keys()}") - await stream.send(matches) - - -async def data_reset_hack( - reset_type: str = 'data', - -) -> None: - ''' - Run key combos for resetting data feeds and yield back to caller - when complete. - - This is a linux-only hack around: - - https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations - - TODOs: - - a return type that hopefully determines if the hack was - successful. - - other OS support? - - integration with ``ib-gw`` run in docker + Xorg? - - ''' - - async def vnc_click_hack( - reset_type: str = 'data' - ) -> None: - ''' - Reset the data or netowork connection for the VNC attached - ib gateway using magic combos. - - ''' - key = {'data': 'f', 'connection': 'r'}[reset_type] - - import asyncvnc - - async with asyncvnc.connect( - 'localhost', - port=3003, - # password='ibcansmbz', - ) as client: - - # move to middle of screen - # 640x1800 - client.mouse.move( - x=500, - y=500, - ) - client.mouse.click() - client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked - - await tractor.to_asyncio.run_task(vnc_click_hack) - - # we don't really need the ``xdotool`` approach any more B) - return True - - def load_flex_trades( path: Optional[str] = None, @@ -2680,7 +1794,7 @@ def load_flex_trades( try: config.write(section, 'trades') except KeyError: - import pdbpp; pdbpp.set_trace() + import pdbpp; pdbpp.set_trace() # noqa if __name__ == '__main__': diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py new file mode 100644 index 00000000..d533af60 --- /dev/null +++ b/piker/brokers/ib/feed.py @@ -0,0 +1,938 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +""" +Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``. + +""" +from __future__ import annotations +import asyncio +from contextlib import asynccontextmanager as acm +from dataclasses import asdict +from datetime import datetime +from math import isnan +import time +from typing import ( + Callable, + Optional, + Awaitable, +) + +from async_generator import aclosing +from fuzzywuzzy import process as fuzzy +import numpy as np +import pendulum +import tractor +import trio +from trio_typing import TaskStatus + +from piker.data._sharedmem import ShmArray +from .._util import SymbolNotFound, NoData +from .client import ( + _adhoc_futes_set, + log, + load_aio_clients, + ibis, + MethodProxy, + open_client_proxies, + get_preferred_data_client, + Ticker, + RequestError, + Contract, +) + + +# https://interactivebrokers.github.io/tws-api/tick_types.html +tick_types = { + 77: 'trade', + + # a "utrade" aka an off exchange "unreportable" (dark) vlm: + # https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume + 48: 'dark_trade', + + # standard L1 ticks + 0: 'bsize', + 1: 'bid', + 2: 'ask', + 3: 'asize', + 4: 'last', + 5: 'size', + 8: 'volume', + + # ``ib_insync`` already packs these into + # quotes under the following fields. + # 55: 'trades_per_min', # `'tradeRate'` + # 56: 'vlm_per_min', # `'volumeRate'` + # 89: 'shortable', # `'shortableShares'` +} + + +@acm +async def open_data_client() -> MethodProxy: + ''' + Open the first found preferred "data client" as defined in the + user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable + and deliver that client wrapped in a ``MethodProxy``. + + ''' + async with ( + open_client_proxies() as (proxies, clients), + ): + account_name, client = get_preferred_data_client(clients) + proxy = proxies.get(f'ib.{account_name}') + if not proxy: + raise ValueError( + f'No preferred data client could be found for {account_name}!' + ) + + yield proxy + + +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + ''' + History retreival endpoint - delivers a historical frame callble + that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. + + ''' + async with open_data_client() as proxy: + + async def get_hist( + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, + + ) -> tuple[np.ndarray, str]: + + out, fails = await get_bars(proxy, symbol, end_dt=end_dt) + + # TODO: add logic here to handle tradable hours and only grab + # valid bars in the range + if out is None: + # could be trying to retreive bars over weekend + log.error(f"Can't grab bars starting at {end_dt}!?!?") + raise NoData( + f'{end_dt}', + frame_size=2000, + ) + + bars, bars_array, first_dt, last_dt = out + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + return bars_array, first_dt, last_dt + + # TODO: it seems like we can do async queries for ohlc + # but getting the order right still isn't working and I'm not + # quite sure why.. needs some tinkering and probably + # a lookthrough of the ``ib_insync`` machinery, for eg. maybe + # we have to do the batch queries on the `asyncio` side? + yield get_hist, {'erlangs': 1, 'rate': 6} + + +_pacing: str = ( + 'Historical Market Data Service error ' + 'message:Historical data request pacing violation' +) + + +async def get_bars( + + proxy: MethodProxy, + fqsn: str, + + # blank to start which tells ib to look up the latest datum + end_dt: str = '', + +) -> (dict, np.ndarray): + ''' + Retrieve historical data from a ``trio``-side task using + a ``MethoProxy``. + + ''' + fails = 0 + bars: Optional[list] = None + first_dt: datetime = None + last_dt: datetime = None + + if end_dt: + last_dt = pendulum.from_timestamp(end_dt.timestamp()) + + for _ in range(10): + try: + out = await proxy.bars( + fqsn=fqsn, + end_dt=end_dt, + ) + if out: + bars, bars_array = out + + else: + await tractor.breakpoint() + + if bars_array is None: + raise SymbolNotFound(fqsn) + + first_dt = pendulum.from_timestamp( + bars[0].date.timestamp()) + + last_dt = pendulum.from_timestamp( + bars[-1].date.timestamp()) + + time = bars_array['time'] + assert time[-1] == last_dt.timestamp() + assert time[0] == first_dt.timestamp() + log.info( + f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' + ) + + return (bars, bars_array, first_dt, last_dt), fails + + except RequestError as err: + msg = err.message + # why do we always need to rebind this? + # _err = err + + if 'No market data permissions for' in msg: + # TODO: signalling for no permissions searches + raise NoData( + f'Symbol: {fqsn}', + ) + + elif ( + err.code == 162 + and 'HMDS query returned no data' in err.message + ): + # XXX: this is now done in the storage mgmt layer + # and we shouldn't implicitly decrement the frame dt + # index since the upper layer may be doing so + # concurrently and we don't want to be delivering frames + # that weren't asked for. + log.warning( + f'NO DATA found ending @ {end_dt}\n' + ) + + # try to decrement start point and look further back + # end_dt = last_dt = last_dt.subtract(seconds=2000) + + raise NoData( + f'Symbol: {fqsn}', + frame_size=2000, + ) + + elif _pacing in msg: + + log.warning( + 'History throttle rate reached!\n' + 'Resetting farms with `ctrl-alt-f` hack\n' + ) + # TODO: we might have to put a task lock around this + # method.. + hist_ev = proxy.status_event( + 'HMDS data farm connection is OK:ushmds' + ) + + # XXX: other event messages we might want to try and + # wait for but i wasn't able to get any of this + # reliable.. + # reconnect_start = proxy.status_event( + # 'Market data farm is connecting:usfuture' + # ) + # live_ev = proxy.status_event( + # 'Market data farm connection is OK:usfuture' + # ) + + # try to wait on the reset event(s) to arrive, a timeout + # will trigger a retry up to 6 times (for now). + tries: int = 2 + timeout: float = 10 + + # try 3 time with a data reset then fail over to + # a connection reset. + for i in range(1, tries): + + log.warning('Sending DATA RESET request') + await data_reset_hack(reset_type='data') + + with trio.move_on_after(timeout) as cs: + for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + break + + if cs.cancelled_caught: + fails += 1 + log.warning( + f'Data reset {name} timeout, retrying {i}.' + ) + + continue + else: + + log.warning('Sending CONNECTION RESET') + await data_reset_hack(reset_type='connection') + + with trio.move_on_after(timeout) as cs: + for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + + if cs.cancelled_caught: + fails += 1 + log.warning('Data CONNECTION RESET timeout!?') + + else: + raise + + return None, None + # else: # throttle wasn't fixed so error out immediately + # raise _err + + +async def backfill_bars( + + fqsn: str, + shm: ShmArray, # type: ignore # noqa + + # TODO: we want to avoid overrunning the underlying shm array buffer + # and we should probably calc the number of calls to make depending + # on that until we have the `marketstore` daemon in place in which + # case the shm size will be driven by user config and available sys + # memory. + count: int = 16, + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Fill historical bars into shared mem / storage afap. + + TODO: avoid pacing constraints: + https://github.com/pikers/piker/issues/128 + + ''' + # last_dt1 = None + last_dt = None + + with trio.CancelScope() as cs: + + async with open_data_client() as proxy: + + out, fails = await get_bars(proxy, fqsn) + + if out is None: + raise RuntimeError("Could not pull currrent history?!") + + (first_bars, bars_array, first_dt, last_dt) = out + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + last_dt = first_dt + + # write historical data to buffer + shm.push(bars_array) + + task_status.started(cs) + + i = 0 + while i < count: + + out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) + + if out is None: + # could be trying to retreive bars over weekend + # TODO: add logic here to handle tradable hours and + # only grab valid bars in the range + log.error(f"Can't grab bars starting at {first_dt}!?!?") + + # XXX: get_bars() should internally decrement dt by + # 2k seconds and try again. + continue + + (first_bars, bars_array, first_dt, last_dt) = out + # last_dt1 = last_dt + # last_dt = first_dt + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + # TODO we should probably dig into forums to see what peeps + # think this data "means" and then use it as an indicator of + # sorts? dinkus has mentioned that $vlms for the day dont' + # match other platforms nor the summary stat tws shows in + # the monitor - it's probably worth investigating. + + shm.push(bars_array, prepend=True) + i += 1 + + +asset_type_map = { + 'STK': 'stock', + 'OPT': 'option', + 'FUT': 'future', + 'CONTFUT': 'continuous_future', + 'CASH': 'forex', + 'IND': 'index', + 'CFD': 'cfd', + 'BOND': 'bond', + 'CMDTY': 'commodity', + 'FOP': 'futures_option', + 'FUND': 'mutual_fund', + 'WAR': 'warrant', + 'IOPT': 'warran', + 'BAG': 'bag', + # 'NEWS': 'news', +} + + +_quote_streams: dict[str, trio.abc.ReceiveStream] = {} + + +async def _setup_quote_stream( + + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + + symbol: str, + opts: tuple[int] = ( + '375', # RT trade volume (excludes utrades) + '233', # RT trade volume (includes utrades) + '236', # Shortable shares + + # these all appear to only be updated every 25s thus + # making them mostly useless and explains why the scanner + # is always slow XD + # '293', # Trade count for day + '294', # Trade rate / minute + '295', # Vlm rate / minute + ), + contract: Optional[Contract] = None, + +) -> trio.abc.ReceiveChannel: + ''' + Stream a ticker using the std L1 api. + + This task is ``asyncio``-side and must be called from + ``tractor.to_asyncio.open_channel_from()``. + + ''' + global _quote_streams + + to_trio.send_nowait(None) + + async with load_aio_clients() as accts2clients: + caccount_name, client = get_preferred_data_client(accts2clients) + contract = contract or (await client.find_contract(symbol)) + ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + + # NOTE: it's batch-wise and slow af but I guess could + # be good for backchecking? Seems to be every 5s maybe? + # ticker: Ticker = client.ib.reqTickByTickData( + # contract, 'Last', + # ) + + # # define a simple queue push routine that streams quote packets + # # to trio over the ``to_trio`` memory channel. + # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + def teardown(): + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + client.ib.cancelMktData(contract) + + # decouple broadcast mem chan + _quote_streams.pop(symbol, None) + + def push(t: Ticker) -> None: + """ + Push quotes to trio task. + + """ + # log.debug(t) + try: + to_trio.send_nowait(t) + + except ( + trio.BrokenResourceError, + + # XXX: HACK, not sure why this gets left stale (probably + # due to our terrible ``tractor.to_asyncio`` + # implementation for streams.. but if the mem chan + # gets left here and starts blocking just kill the feed? + # trio.WouldBlock, + ): + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + teardown() + except trio.WouldBlock: + log.warning( + f'channel is blocking symbol feed for {symbol}?' + f'\n{to_trio.statistics}' + ) + + # except trio.WouldBlock: + # # for slow debugging purposes to avoid clobbering prompt + # # with log msgs + # pass + + ticker.updateEvent.connect(push) + try: + await asyncio.sleep(float('inf')) + finally: + teardown() + + # return from_aio + + +@acm +async def open_aio_quote_stream( + + symbol: str, + contract: Optional[Contract] = None, + +) -> trio.abc.ReceiveStream: + + from tractor.trionics import broadcast_receiver + global _quote_streams + + from_aio = _quote_streams.get(symbol) + if from_aio: + + # if we already have a cached feed deliver a rx side clone to consumer + async with broadcast_receiver( + from_aio, + 2**6, + ) as from_aio: + yield from_aio + return + + async with tractor.to_asyncio.open_channel_from( + _setup_quote_stream, + symbol=symbol, + contract=contract, + + ) as (first, from_aio): + + # cache feed for later consumers + _quote_streams[symbol] = from_aio + + yield from_aio + + +# TODO: cython/mypyc/numba this! +def normalize( + ticker: Ticker, + calc_price: bool = False + +) -> dict: + + # should be real volume for this contract by default + calc_price = False + + # check for special contract types + con = ticker.contract + if type(con) in ( + ibis.Commodity, + ibis.Forex, + ): + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = con.secType + # no real volume on this tract + calc_price = True + + else: + suffix = con.primaryExchange + if not suffix: + suffix = con.exchange + + # append a `.` to the returned symbol + # key for derivatives that normally is the expiry + # date key. + expiry = con.lastTradeDateOrContractMonth + if expiry: + suffix += f'.{expiry}' + + # convert named tuples to dicts so we send usable keys + new_ticks = [] + for tick in ticker.ticks: + if tick and not isinstance(tick, dict): + td = tick._asdict() + td['type'] = tick_types.get( + td['tickType'], + 'n/a', + ) + + new_ticks.append(td) + + tbt = ticker.tickByTicks + if tbt: + print(f'tickbyticks:\n {ticker.tickByTicks}') + + ticker.ticks = new_ticks + + # some contracts don't have volume so we may want to calculate + # a midpoint price based on data we can acquire (such as bid / ask) + if calc_price: + ticker.ticks.append( + {'type': 'trade', 'price': ticker.marketPrice()} + ) + + # serialize for transport + data = asdict(ticker) + + # generate fqsn with possible specialized suffix + # for derivatives, note the lowercase. + data['symbol'] = data['fqsn'] = '.'.join( + (con.symbol, suffix) + ).lower() + + # convert named tuples to dicts for transport + tbts = data.get('tickByTicks') + if tbts: + data['tickByTicks'] = [tbt._asdict() for tbt in tbts] + + # add time stamps for downstream latency measurements + data['brokerd_ts'] = time.time() + + # stupid stupid shit...don't even care any more.. + # leave it until we do a proper latency study + # if ticker.rtTime is not None: + # data['broker_ts'] = data['rtTime_s'] = float( + # ticker.rtTime.timestamp) / 1000. + data.pop('rtTime') + + return data + + +async def stream_quotes( + + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Stream symbol quotes. + + This is a ``trio`` callable routine meant to be invoked + once the brokerd is up. + + ''' + # TODO: support multiple subscriptions + sym = symbols[0] + log.info(f'request for real-time quotes: {sym}') + + async with open_data_client() as proxy: + + con, first_ticker, details = await proxy.get_sym_details(symbol=sym) + first_quote = normalize(first_ticker) + # print(f'first quote: {first_quote}') + + def mk_init_msgs() -> dict[str, dict]: + ''' + Collect a bunch of meta-data useful for feed startup and + pack in a `dict`-msg. + + ''' + # pass back some symbol info like min_tick, trading_hours, etc. + syminfo = asdict(details) + syminfo.update(syminfo['contract']) + + # nested dataclass we probably don't need and that won't IPC + # serialize + syminfo.pop('secIdList') + + # TODO: more consistent field translation + atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] + + # for stocks it seems TWS reports too small a tick size + # such that you can't submit orders with that granularity? + min_tick = 0.01 if atype == 'stock' else 0 + + syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) + + # for "traditional" assets, volume is normally discreet, not + # a float + syminfo['lot_tick_size'] = 0.0 + + ibclient = proxy._aio_ns.ib.client + host, port = ibclient.host, ibclient.port + + # TODO: for loop through all symbols passed in + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': syminfo, + 'fqsn': first_quote['fqsn'], + }, + 'status': { + 'data_ep': f'{host}:{port}', + }, + + } + return init_msgs + + init_msgs = mk_init_msgs() + + # TODO: we should instead spawn a task that waits on a feed to start + # and let it wait indefinitely..instead of this hard coded stuff. + with trio.move_on_after(1): + contract, first_ticker, details = await proxy.get_quote(symbol=sym) + + # it might be outside regular trading hours so see if we can at + # least grab history. + if isnan(first_ticker.last): + task_status.started((init_msgs, first_quote)) + + # it's not really live but this will unblock + # the brokerd feed task to tell the ui to update? + feed_is_live.set() + + # block and let data history backfill code run. + await trio.sleep_forever() + return # we never expect feed to come up? + + async with open_aio_quote_stream( + symbol=sym, + contract=con, + ) as stream: + + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + + task_status.started((init_msgs, first_quote)) + + async with aclosing(stream): + if type(first_ticker.contract) not in ( + ibis.Commodity, + ibis.Forex + ): + # wait for real volume on feed (trading might be closed) + while True: + ticker = await stream.receive() + + # for a real volume contract we rait for the first + # "real" trade to take place + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): + # spin consuming tickers until we get a real + # market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) + ticker.ticks = [] + + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + + # tell caller quotes are now coming in live + feed_is_live.set() + + # last = time.time() + async for ticker in stream: + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + # last = time.time() + + +async def data_reset_hack( + reset_type: str = 'data', + +) -> None: + ''' + Run key combos for resetting data feeds and yield back to caller + when complete. + + This is a linux-only hack around: + + https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations + + TODOs: + - a return type that hopefully determines if the hack was + successful. + - other OS support? + - integration with ``ib-gw`` run in docker + Xorg? + + ''' + + async def vnc_click_hack( + reset_type: str = 'data' + ) -> None: + ''' + Reset the data or netowork connection for the VNC attached + ib gateway using magic combos. + + ''' + key = {'data': 'f', 'connection': 'r'}[reset_type] + + import asyncvnc + + async with asyncvnc.connect( + 'localhost', + port=3003, + # password='ibcansmbz', + ) as client: + + # move to middle of screen + # 640x1800 + client.mouse.move( + x=500, + y=500, + ) + client.mouse.click() + client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked + + await tractor.to_asyncio.run_task(vnc_click_hack) + + # we don't really need the ``xdotool`` approach any more B) + return True + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, + +) -> None: + + # TODO: load user defined symbol set locally for fast search? + await ctx.started({}) + + async with open_data_client() as proxy: + async with ctx.open_stream() as stream: + + last = time.time() + + async for pattern in stream: + log.debug(f'received {pattern}') + now = time.time() + + assert pattern, 'IB can not accept blank search pattern' + + # throttle search requests to no faster then 1Hz + diff = now - last + if diff < 1.0: + log.debug('throttle sleeping') + await trio.sleep(diff) + try: + pattern = stream.receive_nowait() + except trio.WouldBlock: + pass + + if not pattern or pattern.isspace(): + log.warning('empty pattern received, skipping..') + + # TODO: *BUG* if nothing is returned here the client + # side will cache a null set result and not showing + # anything to the use on re-searches when this query + # timed out. We probably need a special "timeout" msg + # or something... + + # XXX: this unblocks the far end search task which may + # hold up a multi-search nursery block + await stream.send({}) + + continue + + log.debug(f'searching for {pattern}') + + last = time.time() + + # async batch search using api stocks endpoint and module + # defined adhoc symbol set. + stock_results = [] + + async def stash_results(target: Awaitable[list]): + stock_results.extend(await target) + + async with trio.open_nursery() as sn: + sn.start_soon( + stash_results, + proxy.search_symbols( + pattern=pattern, + upto=5, + ), + ) + + # trigger async request + await trio.sleep(0) + + # match against our ad-hoc set immediately + adhoc_matches = fuzzy.extractBests( + pattern, + list(_adhoc_futes_set), + score_cutoff=90, + ) + log.info(f'fuzzy matched adhocs: {adhoc_matches}') + adhoc_match_results = {} + if adhoc_matches: + # TODO: do we need to pull contract details? + adhoc_match_results = {i[0]: {} for i in adhoc_matches} + + log.debug(f'fuzzy matching stocks {stock_results}') + stock_matches = fuzzy.extractBests( + pattern, + stock_results, + score_cutoff=50, + ) + + matches = adhoc_match_results | { + item[0]: {} for item in stock_matches + } + # TODO: we used to deliver contract details + # {item[2]: item[0] for item in stock_matches} + + log.debug(f"sending matches: {matches.keys()}") + await stream.send(matches) diff --git a/piker/data/feed.py b/piker/data/feed.py index 1165fddc..66ced72f 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -700,6 +700,7 @@ async def manage_history( bfqsn = fqsn.replace('.' + mod.name, '') open_history_client = getattr(mod, 'open_history_client', None) + assert open_history_client if is_up and opened and open_history_client: