From 99eabe34c928b39381e4f717608ad62e7986fe69 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Jun 2022 14:51:18 -0400 Subject: [PATCH 1/6] Convert `ib` backend into sub-package The single-file module was getting way out of hand size-wise with the new flex report parsing stuff so this starts the process of breaking things up into smaller modules oriented around trade, data, and ledger related endpoints. Add support for backends to declare sub-modules to enable in a `__enable_modules__: list[str]` module var which is parsed by the daemon spawning code passed to `tractor`'s `enable_modules: list[str]` input. --- piker/_daemon.py | 12 +++++- piker/brokers/ib/__init__.py | 54 +++++++++++++++++++++++++++ piker/brokers/{ib.py => ib/client.py} | 30 +++++---------- 3 files changed, 74 insertions(+), 22 deletions(-) create mode 100644 piker/brokers/ib/__init__.py rename piker/brokers/{ib.py => ib/client.py} (99%) diff --git a/piker/_daemon.py b/piker/_daemon.py index 999b8fce..e732a15d 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -426,9 +426,19 @@ async def spawn_brokerd( # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery + modpath = brokermod.__name__ + broker_enable = [modpath] + for submodname in getattr( + brokermod, + '__enable_modules__', + [], + ): + subpath = f'{modpath}.{submodname}' + broker_enable.append(subpath) + portal = await _services.actor_n.start_actor( dname, - enable_modules=_data_mods + [brokermod.__name__], + enable_modules=_data_mods + broker_enable, loglevel=loglevel, debug_mode=_services.debug_mode, **tractor_kwargs diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py new file mode 100644 index 00000000..877b0667 --- /dev/null +++ b/piker/brokers/ib/__init__.py @@ -0,0 +1,54 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Interactive Brokers API backend. + +Sub-modules within break into the core functionalities: + +- ``broker.py`` part for orders / trading endpoints +- ``data.py`` for real-time data feed endpoints + +- ``client.py`` for the core API machinery which is ``trio``-ized + wrapping around ``ib_insync``. + +- ``report.py`` for the hackery to build manual pp calcs + to avoid ib's absolute bullshit FIFO style position + tracking.. + +""" +from .client import ( + get_client, + open_history_client, + open_symbol_search, + stream_quotes, + trades_dialogue, +) + +# tractor RPC enable arg +__enable_modules__: list[str] = [ + 'client', +] + +# passed to ``tractor.ActorNursery.start_actor()`` +_spawn_kwargs = { + 'infect_asyncio': True, +} + +# annotation to let backend agnostic code +# know if ``brokerd`` should be spawned with +# ``tractor``'s aio mode. +_infect_asyncio: bool = True diff --git a/piker/brokers/ib.py b/piker/brokers/ib/client.py similarity index 99% rename from piker/brokers/ib.py rename to piker/brokers/ib/client.py index 9b32521f..cbbbd8d4 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib/client.py @@ -15,11 +15,8 @@ # along with this program. If not, see . """ -Interactive Brokers API backend. - -Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is -built on it) and thus actor aware API calls must be spawned with -``infected_aio==True``. +``ib`` core API client machinery; mostly sane wrapping around +``ib_insync``. """ from __future__ import annotations @@ -61,12 +58,12 @@ import numpy as np import pendulum -from .. import config -from ..log import get_logger, get_console_log -from ..data._source import base_ohlc_dtype -from ..data._sharedmem import ShmArray -from ._util import SymbolNotFound, NoData -from ..clearing._messages import ( +from piker import config +from piker.log import get_logger, get_console_log +from piker.data._source import base_ohlc_dtype +from piker.data._sharedmem import ShmArray +from .._util import SymbolNotFound, NoData +from piker.clearing._messages import ( BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdPosition, BrokerdCancel, BrokerdFill, BrokerdError, @@ -75,10 +72,7 @@ from ..clearing._messages import ( log = get_logger(__name__) -# passed to ``tractor.ActorNursery.start_actor()`` -_spawn_kwargs = { - 'infect_asyncio': True, -} + _time_units = { 's': ' sec', 'm': ' mins', @@ -118,12 +112,6 @@ _search_conf = { } -# annotation to let backend agnostic code -# know if ``brokerd`` should be spawned with -# ``tractor``'s aio mode. -_infect_asyncio: bool = True - - # overrides to sidestep pretty questionable design decisions in # ``ib_insync``: class NonShittyWrapper(Wrapper): From 1c1661b783b9a5c940fea57da882fd99fbe0cbf2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Jun 2022 15:27:05 -0400 Subject: [PATCH 2/6] Factor all data feed endpoints into `.ib.feed.py` --- piker/brokers/ib/__init__.py | 14 +- piker/brokers/ib/client.py | 898 +-------------------------------- piker/brokers/ib/feed.py | 938 +++++++++++++++++++++++++++++++++++ piker/data/feed.py | 1 + 4 files changed, 958 insertions(+), 893 deletions(-) create mode 100644 piker/brokers/ib/feed.py diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 877b0667..1d6bac33 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -32,15 +32,27 @@ Sub-modules within break into the core functionalities: """ from .client import ( get_client, + trades_dialogue, +) +from .feed import ( open_history_client, open_symbol_search, stream_quotes, - trades_dialogue, ) +__all__ = [ + 'get_client', + 'trades_dialogue', + 'open_history_client', + 'open_symbol_search', + 'stream_quotes', +] + + # tractor RPC enable arg __enable_modules__: list[str] = [ 'client', + 'feed', ] # passed to ``tractor.ActorNursery.start_actor()`` diff --git a/piker/brokers/ib/client.py b/piker/brokers/ib/client.py index cbbbd8d4..43626c04 100644 --- a/piker/brokers/ib/client.py +++ b/piker/brokers/ib/client.py @@ -28,8 +28,9 @@ from functools import partial import itertools from math import isnan from typing import ( - Any, Callable, Optional, - AsyncIterator, Awaitable, + Any, + Optional, + AsyncIterator, Union, ) import asyncio @@ -43,7 +44,6 @@ import trio from trio_typing import TaskStatus import tractor from tractor import to_asyncio -from async_generator import aclosing from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails, Option from ib_insync.order import Order, Trade, OrderStatus @@ -53,16 +53,12 @@ from ib_insync.objects import Position import ib_insync as ibis from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client -from fuzzywuzzy import process as fuzzy import numpy as np -import pendulum from piker import config from piker.log import get_logger, get_console_log from piker.data._source import base_ohlc_dtype -from piker.data._sharedmem import ShmArray -from .._util import SymbolNotFound, NoData from piker.clearing._messages import ( BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdPosition, BrokerdCancel, @@ -1151,27 +1147,6 @@ def get_preferred_data_client( ) -@acm -async def open_data_client() -> MethodProxy: - ''' - Open the first found preferred "data client" as defined in the - user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable - and deliver that client wrapped in a ``MethodProxy``. - - ''' - async with ( - open_client_proxies() as (proxies, clients), - ): - account_name, client = get_preferred_data_client(clients) - proxy = proxies.get(f'ib.{account_name}') - if not proxy: - raise ValueError( - f'No preferred data client could be found for {account_name}!' - ) - - yield proxy - - class MethodProxy: def __init__( @@ -1355,725 +1330,14 @@ async def get_client( a method proxy to it. ''' + from .feed import open_data_client + # TODO: the IPC via portal relay layer for when this current # actor isn't in aio mode. async with open_data_client() as proxy: yield proxy -# https://interactivebrokers.github.io/tws-api/tick_types.html -tick_types = { - 77: 'trade', - - # a "utrade" aka an off exchange "unreportable" (dark) vlm: - # https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume - 48: 'dark_trade', - - # standard L1 ticks - 0: 'bsize', - 1: 'bid', - 2: 'ask', - 3: 'asize', - 4: 'last', - 5: 'size', - 8: 'volume', - - # ``ib_insync`` already packs these into - # quotes under the following fields. - # 55: 'trades_per_min', # `'tradeRate'` - # 56: 'vlm_per_min', # `'volumeRate'` - # 89: 'shortable', # `'shortableShares'` -} - - -# TODO: cython/mypyc/numba this! -def normalize( - ticker: Ticker, - calc_price: bool = False - -) -> dict: - - # should be real volume for this contract by default - calc_price = False - - # check for special contract types - con = ticker.contract - if type(con) in ( - ibis.Commodity, - ibis.Forex, - ): - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = con.secType - # no real volume on this tract - calc_price = True - - else: - suffix = con.primaryExchange - if not suffix: - suffix = con.exchange - - # append a `.` to the returned symbol - # key for derivatives that normally is the expiry - # date key. - expiry = con.lastTradeDateOrContractMonth - if expiry: - suffix += f'.{expiry}' - - # convert named tuples to dicts so we send usable keys - new_ticks = [] - for tick in ticker.ticks: - if tick and not isinstance(tick, dict): - td = tick._asdict() - td['type'] = tick_types.get( - td['tickType'], - 'n/a', - ) - - new_ticks.append(td) - - tbt = ticker.tickByTicks - if tbt: - print(f'tickbyticks:\n {ticker.tickByTicks}') - - ticker.ticks = new_ticks - - # some contracts don't have volume so we may want to calculate - # a midpoint price based on data we can acquire (such as bid / ask) - if calc_price: - ticker.ticks.append( - {'type': 'trade', 'price': ticker.marketPrice()} - ) - - # serialize for transport - data = asdict(ticker) - - # generate fqsn with possible specialized suffix - # for derivatives, note the lowercase. - data['symbol'] = data['fqsn'] = '.'.join( - (con.symbol, suffix) - ).lower() - - # convert named tuples to dicts for transport - tbts = data.get('tickByTicks') - if tbts: - data['tickByTicks'] = [tbt._asdict() for tbt in tbts] - - # add time stamps for downstream latency measurements - data['brokerd_ts'] = time.time() - - # stupid stupid shit...don't even care any more.. - # leave it until we do a proper latency study - # if ticker.rtTime is not None: - # data['broker_ts'] = data['rtTime_s'] = float( - # ticker.rtTime.timestamp) / 1000. - data.pop('rtTime') - - return data - - -_pacing: str = ( - 'Historical Market Data Service error ' - 'message:Historical data request pacing violation' -) - - -async def get_bars( - - proxy: MethodProxy, - fqsn: str, - - # blank to start which tells ib to look up the latest datum - end_dt: str = '', - -) -> (dict, np.ndarray): - ''' - Retrieve historical data from a ``trio``-side task using - a ``MethoProxy``. - - ''' - fails = 0 - bars: Optional[list] = None - first_dt: datetime = None - last_dt: datetime = None - - if end_dt: - last_dt = pendulum.from_timestamp(end_dt.timestamp()) - - for _ in range(10): - try: - out = await proxy.bars( - fqsn=fqsn, - end_dt=end_dt, - ) - if out: - bars, bars_array = out - - else: - await tractor.breakpoint() - - if bars_array is None: - raise SymbolNotFound(fqsn) - - first_dt = pendulum.from_timestamp( - bars[0].date.timestamp()) - - last_dt = pendulum.from_timestamp( - bars[-1].date.timestamp()) - - time = bars_array['time'] - assert time[-1] == last_dt.timestamp() - assert time[0] == first_dt.timestamp() - log.info( - f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' - ) - - return (bars, bars_array, first_dt, last_dt), fails - - except RequestError as err: - msg = err.message - # why do we always need to rebind this? - # _err = err - - if 'No market data permissions for' in msg: - # TODO: signalling for no permissions searches - raise NoData( - f'Symbol: {fqsn}', - ) - - elif ( - err.code == 162 - and 'HMDS query returned no data' in err.message - ): - # XXX: this is now done in the storage mgmt layer - # and we shouldn't implicitly decrement the frame dt - # index since the upper layer may be doing so - # concurrently and we don't want to be delivering frames - # that weren't asked for. - log.warning( - f'NO DATA found ending @ {end_dt}\n' - ) - - # try to decrement start point and look further back - # end_dt = last_dt = last_dt.subtract(seconds=2000) - - raise NoData( - f'Symbol: {fqsn}', - frame_size=2000, - ) - - elif _pacing in msg: - - log.warning( - 'History throttle rate reached!\n' - 'Resetting farms with `ctrl-alt-f` hack\n' - ) - # TODO: we might have to put a task lock around this - # method.. - hist_ev = proxy.status_event( - 'HMDS data farm connection is OK:ushmds' - ) - - # XXX: other event messages we might want to try and - # wait for but i wasn't able to get any of this - # reliable.. - # reconnect_start = proxy.status_event( - # 'Market data farm is connecting:usfuture' - # ) - # live_ev = proxy.status_event( - # 'Market data farm connection is OK:usfuture' - # ) - - # try to wait on the reset event(s) to arrive, a timeout - # will trigger a retry up to 6 times (for now). - tries: int = 2 - timeout: float = 10 - - # try 3 time with a data reset then fail over to - # a connection reset. - for i in range(1, tries): - - log.warning('Sending DATA RESET request') - await data_reset_hack(reset_type='data') - - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") - break - - if cs.cancelled_caught: - fails += 1 - log.warning( - f'Data reset {name} timeout, retrying {i}.' - ) - - continue - else: - - log.warning('Sending CONNECTION RESET') - await data_reset_hack(reset_type='connection') - - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") - - if cs.cancelled_caught: - fails += 1 - log.warning('Data CONNECTION RESET timeout!?') - - else: - raise - - return None, None - # else: # throttle wasn't fixed so error out immediately - # raise _err - - -@acm -async def open_history_client( - symbol: str, - -) -> tuple[Callable, int]: - ''' - History retreival endpoint - delivers a historical frame callble - that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. - - ''' - async with open_data_client() as proxy: - - async def get_hist( - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, - - ) -> tuple[np.ndarray, str]: - - out, fails = await get_bars(proxy, symbol, end_dt=end_dt) - - # TODO: add logic here to handle tradable hours and only grab - # valid bars in the range - if out is None: - # could be trying to retreive bars over weekend - log.error(f"Can't grab bars starting at {end_dt}!?!?") - raise NoData( - f'{end_dt}', - frame_size=2000, - ) - - bars, bars_array, first_dt, last_dt = out - - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - - return bars_array, first_dt, last_dt - - # TODO: it seems like we can do async queries for ohlc - # but getting the order right still isn't working and I'm not - # quite sure why.. needs some tinkering and probably - # a lookthrough of the ``ib_insync`` machinery, for eg. maybe - # we have to do the batch queries on the `asyncio` side? - yield get_hist, {'erlangs': 1, 'rate': 6} - - -async def backfill_bars( - - fqsn: str, - shm: ShmArray, # type: ignore # noqa - - # TODO: we want to avoid overrunning the underlying shm array buffer - # and we should probably calc the number of calls to make depending - # on that until we have the `marketstore` daemon in place in which - # case the shm size will be driven by user config and available sys - # memory. - count: int = 16, - - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - - TODO: avoid pacing constraints: - https://github.com/pikers/piker/issues/128 - - ''' - # last_dt1 = None - last_dt = None - - with trio.CancelScope() as cs: - - async with open_data_client() as proxy: - - out, fails = await get_bars(proxy, fqsn) - - if out is None: - raise RuntimeError("Could not pull currrent history?!") - - (first_bars, bars_array, first_dt, last_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - last_dt = first_dt - - # write historical data to buffer - shm.push(bars_array) - - task_status.started(cs) - - i = 0 - while i < count: - - out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) - - if out is None: - # could be trying to retreive bars over weekend - # TODO: add logic here to handle tradable hours and - # only grab valid bars in the range - log.error(f"Can't grab bars starting at {first_dt}!?!?") - - # XXX: get_bars() should internally decrement dt by - # 2k seconds and try again. - continue - - (first_bars, bars_array, first_dt, last_dt) = out - # last_dt1 = last_dt - # last_dt = first_dt - - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - - # TODO we should probably dig into forums to see what peeps - # think this data "means" and then use it as an indicator of - # sorts? dinkus has mentioned that $vlms for the day dont' - # match other platforms nor the summary stat tws shows in - # the monitor - it's probably worth investigating. - - shm.push(bars_array, prepend=True) - i += 1 - - -asset_type_map = { - 'STK': 'stock', - 'OPT': 'option', - 'FUT': 'future', - 'CONTFUT': 'continuous_future', - 'CASH': 'forex', - 'IND': 'index', - 'CFD': 'cfd', - 'BOND': 'bond', - 'CMDTY': 'commodity', - 'FOP': 'futures_option', - 'FUND': 'mutual_fund', - 'WAR': 'warrant', - 'IOPT': 'warran', - 'BAG': 'bag', - # 'NEWS': 'news', -} - - -_quote_streams: dict[str, trio.abc.ReceiveStream] = {} - - -async def _setup_quote_stream( - - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - - symbol: str, - opts: tuple[int] = ( - '375', # RT trade volume (excludes utrades) - '233', # RT trade volume (includes utrades) - '236', # Shortable shares - - # these all appear to only be updated every 25s thus - # making them mostly useless and explains why the scanner - # is always slow XD - # '293', # Trade count for day - '294', # Trade rate / minute - '295', # Vlm rate / minute - ), - contract: Optional[Contract] = None, - -) -> trio.abc.ReceiveChannel: - ''' - Stream a ticker using the std L1 api. - - This task is ``asyncio``-side and must be called from - ``tractor.to_asyncio.open_channel_from()``. - - ''' - global _quote_streams - - to_trio.send_nowait(None) - - async with load_aio_clients() as accts2clients: - caccount_name, client = get_preferred_data_client(accts2clients) - contract = contract or (await client.find_contract(symbol)) - ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) - - # NOTE: it's batch-wise and slow af but I guess could - # be good for backchecking? Seems to be every 5s maybe? - # ticker: Ticker = client.ib.reqTickByTickData( - # contract, 'Last', - # ) - - # # define a simple queue push routine that streams quote packets - # # to trio over the ``to_trio`` memory channel. - # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore - def teardown(): - ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") - client.ib.cancelMktData(contract) - - # decouple broadcast mem chan - _quote_streams.pop(symbol, None) - - def push(t: Ticker) -> None: - """ - Push quotes to trio task. - - """ - # log.debug(t) - try: - to_trio.send_nowait(t) - - except ( - trio.BrokenResourceError, - - # XXX: HACK, not sure why this gets left stale (probably - # due to our terrible ``tractor.to_asyncio`` - # implementation for streams.. but if the mem chan - # gets left here and starts blocking just kill the feed? - # trio.WouldBlock, - ): - # XXX: eventkit's ``Event.emit()`` for whatever redic - # reason will catch and ignore regular exceptions - # resulting in tracebacks spammed to console.. - # Manually do the dereg ourselves. - teardown() - except trio.WouldBlock: - log.warning( - f'channel is blocking symbol feed for {symbol}?' - f'\n{to_trio.statistics}' - ) - - # except trio.WouldBlock: - # # for slow debugging purposes to avoid clobbering prompt - # # with log msgs - # pass - - ticker.updateEvent.connect(push) - try: - await asyncio.sleep(float('inf')) - finally: - teardown() - - # return from_aio - - -@acm -async def open_aio_quote_stream( - - symbol: str, - contract: Optional[Contract] = None, - -) -> trio.abc.ReceiveStream: - - from tractor.trionics import broadcast_receiver - global _quote_streams - - from_aio = _quote_streams.get(symbol) - if from_aio: - - # if we already have a cached feed deliver a rx side clone to consumer - async with broadcast_receiver( - from_aio, - 2**6, - ) as from_aio: - yield from_aio - return - - async with tractor.to_asyncio.open_channel_from( - _setup_quote_stream, - symbol=symbol, - contract=contract, - - ) as (first, from_aio): - - # cache feed for later consumers - _quote_streams[symbol] = from_aio - - yield from_aio - - -async def stream_quotes( - - send_chan: trio.abc.SendChannel, - symbols: list[str], - feed_is_live: trio.Event, - loglevel: str = None, - - # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Stream symbol quotes. - - This is a ``trio`` callable routine meant to be invoked - once the brokerd is up. - - ''' - # TODO: support multiple subscriptions - sym = symbols[0] - log.info(f'request for real-time quotes: {sym}') - - async with open_data_client() as proxy: - - con, first_ticker, details = await proxy.get_sym_details(symbol=sym) - first_quote = normalize(first_ticker) - # print(f'first quote: {first_quote}') - - def mk_init_msgs() -> dict[str, dict]: - ''' - Collect a bunch of meta-data useful for feed startup and - pack in a `dict`-msg. - - ''' - # pass back some symbol info like min_tick, trading_hours, etc. - syminfo = asdict(details) - syminfo.update(syminfo['contract']) - - # nested dataclass we probably don't need and that won't IPC - # serialize - syminfo.pop('secIdList') - - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - min_tick = 0.01 if atype == 'stock' else 0 - - syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - - # for "traditional" assets, volume is normally discreet, not - # a float - syminfo['lot_tick_size'] = 0.0 - - ibclient = proxy._aio_ns.ib.client - host, port = ibclient.host, ibclient.port - - # TODO: for loop through all symbols passed in - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': syminfo, - 'fqsn': first_quote['fqsn'], - }, - 'status': { - 'data_ep': f'{host}:{port}', - }, - - } - return init_msgs - - init_msgs = mk_init_msgs() - - # TODO: we should instead spawn a task that waits on a feed to start - # and let it wait indefinitely..instead of this hard coded stuff. - with trio.move_on_after(1): - contract, first_ticker, details = await proxy.get_quote(symbol=sym) - - # it might be outside regular trading hours so see if we can at - # least grab history. - if isnan(first_ticker.last): - task_status.started((init_msgs, first_quote)) - - # it's not really live but this will unblock - # the brokerd feed task to tell the ui to update? - feed_is_live.set() - - # block and let data history backfill code run. - await trio.sleep_forever() - return # we never expect feed to come up? - - async with open_aio_quote_stream( - symbol=sym, - contract=con, - ) as stream: - - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] - - task_status.started((init_msgs, first_quote)) - - async with aclosing(stream): - if type(first_ticker.contract) not in ( - ibis.Commodity, - ibis.Forex - ): - # wait for real volume on feed (trading might be closed) - while True: - ticker = await stream.receive() - - # for a real volume contract we rait for the first - # "real" trade to take place - if ( - # not calc_price - # and not ticker.rtTime - not ticker.rtTime - ): - # spin consuming tickers until we get a real - # market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] - - # XXX: this works because we don't use - # ``aclosing()`` above? - break - - quote = normalize(ticker) - log.debug(f"First ticker received {quote}") - - # tell caller quotes are now coming in live - feed_is_live.set() - - # last = time.time() - async for ticker in stream: - quote = normalize(ticker) - await send_chan.send({quote['fqsn']: quote}) - - # ugh, clear ticks since we've consumed them - ticker.ticks = [] - # last = time.time() - - def pack_position( pos: Position @@ -2461,156 +1725,6 @@ async def deliver_trade_events( await ems_stream.send(msg.dict()) -@tractor.context -async def open_symbol_search( - ctx: tractor.Context, - -) -> None: - - # TODO: load user defined symbol set locally for fast search? - await ctx.started({}) - - async with open_data_client() as proxy: - async with ctx.open_stream() as stream: - - last = time.time() - - async for pattern in stream: - log.debug(f'received {pattern}') - now = time.time() - - assert pattern, 'IB can not accept blank search pattern' - - # throttle search requests to no faster then 1Hz - diff = now - last - if diff < 1.0: - log.debug('throttle sleeping') - await trio.sleep(diff) - try: - pattern = stream.receive_nowait() - except trio.WouldBlock: - pass - - if not pattern or pattern.isspace(): - log.warning('empty pattern received, skipping..') - - # TODO: *BUG* if nothing is returned here the client - # side will cache a null set result and not showing - # anything to the use on re-searches when this query - # timed out. We probably need a special "timeout" msg - # or something... - - # XXX: this unblocks the far end search task which may - # hold up a multi-search nursery block - await stream.send({}) - - continue - - log.debug(f'searching for {pattern}') - - last = time.time() - - # async batch search using api stocks endpoint and module - # defined adhoc symbol set. - stock_results = [] - - async def stash_results(target: Awaitable[list]): - stock_results.extend(await target) - - async with trio.open_nursery() as sn: - sn.start_soon( - stash_results, - proxy.search_symbols( - pattern=pattern, - upto=5, - ), - ) - - # trigger async request - await trio.sleep(0) - - # match against our ad-hoc set immediately - adhoc_matches = fuzzy.extractBests( - pattern, - list(_adhoc_futes_set), - score_cutoff=90, - ) - log.info(f'fuzzy matched adhocs: {adhoc_matches}') - adhoc_match_results = {} - if adhoc_matches: - # TODO: do we need to pull contract details? - adhoc_match_results = {i[0]: {} for i in adhoc_matches} - - log.debug(f'fuzzy matching stocks {stock_results}') - stock_matches = fuzzy.extractBests( - pattern, - stock_results, - score_cutoff=50, - ) - - matches = adhoc_match_results | { - item[0]: {} for item in stock_matches - } - # TODO: we used to deliver contract details - # {item[2]: item[0] for item in stock_matches} - - log.debug(f"sending matches: {matches.keys()}") - await stream.send(matches) - - -async def data_reset_hack( - reset_type: str = 'data', - -) -> None: - ''' - Run key combos for resetting data feeds and yield back to caller - when complete. - - This is a linux-only hack around: - - https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations - - TODOs: - - a return type that hopefully determines if the hack was - successful. - - other OS support? - - integration with ``ib-gw`` run in docker + Xorg? - - ''' - - async def vnc_click_hack( - reset_type: str = 'data' - ) -> None: - ''' - Reset the data or netowork connection for the VNC attached - ib gateway using magic combos. - - ''' - key = {'data': 'f', 'connection': 'r'}[reset_type] - - import asyncvnc - - async with asyncvnc.connect( - 'localhost', - port=3003, - # password='ibcansmbz', - ) as client: - - # move to middle of screen - # 640x1800 - client.mouse.move( - x=500, - y=500, - ) - client.mouse.click() - client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked - - await tractor.to_asyncio.run_task(vnc_click_hack) - - # we don't really need the ``xdotool`` approach any more B) - return True - - def load_flex_trades( path: Optional[str] = None, @@ -2680,7 +1794,7 @@ def load_flex_trades( try: config.write(section, 'trades') except KeyError: - import pdbpp; pdbpp.set_trace() + import pdbpp; pdbpp.set_trace() # noqa if __name__ == '__main__': diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py new file mode 100644 index 00000000..d533af60 --- /dev/null +++ b/piker/brokers/ib/feed.py @@ -0,0 +1,938 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +""" +Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``. + +""" +from __future__ import annotations +import asyncio +from contextlib import asynccontextmanager as acm +from dataclasses import asdict +from datetime import datetime +from math import isnan +import time +from typing import ( + Callable, + Optional, + Awaitable, +) + +from async_generator import aclosing +from fuzzywuzzy import process as fuzzy +import numpy as np +import pendulum +import tractor +import trio +from trio_typing import TaskStatus + +from piker.data._sharedmem import ShmArray +from .._util import SymbolNotFound, NoData +from .client import ( + _adhoc_futes_set, + log, + load_aio_clients, + ibis, + MethodProxy, + open_client_proxies, + get_preferred_data_client, + Ticker, + RequestError, + Contract, +) + + +# https://interactivebrokers.github.io/tws-api/tick_types.html +tick_types = { + 77: 'trade', + + # a "utrade" aka an off exchange "unreportable" (dark) vlm: + # https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume + 48: 'dark_trade', + + # standard L1 ticks + 0: 'bsize', + 1: 'bid', + 2: 'ask', + 3: 'asize', + 4: 'last', + 5: 'size', + 8: 'volume', + + # ``ib_insync`` already packs these into + # quotes under the following fields. + # 55: 'trades_per_min', # `'tradeRate'` + # 56: 'vlm_per_min', # `'volumeRate'` + # 89: 'shortable', # `'shortableShares'` +} + + +@acm +async def open_data_client() -> MethodProxy: + ''' + Open the first found preferred "data client" as defined in the + user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable + and deliver that client wrapped in a ``MethodProxy``. + + ''' + async with ( + open_client_proxies() as (proxies, clients), + ): + account_name, client = get_preferred_data_client(clients) + proxy = proxies.get(f'ib.{account_name}') + if not proxy: + raise ValueError( + f'No preferred data client could be found for {account_name}!' + ) + + yield proxy + + +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + ''' + History retreival endpoint - delivers a historical frame callble + that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. + + ''' + async with open_data_client() as proxy: + + async def get_hist( + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, + + ) -> tuple[np.ndarray, str]: + + out, fails = await get_bars(proxy, symbol, end_dt=end_dt) + + # TODO: add logic here to handle tradable hours and only grab + # valid bars in the range + if out is None: + # could be trying to retreive bars over weekend + log.error(f"Can't grab bars starting at {end_dt}!?!?") + raise NoData( + f'{end_dt}', + frame_size=2000, + ) + + bars, bars_array, first_dt, last_dt = out + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + return bars_array, first_dt, last_dt + + # TODO: it seems like we can do async queries for ohlc + # but getting the order right still isn't working and I'm not + # quite sure why.. needs some tinkering and probably + # a lookthrough of the ``ib_insync`` machinery, for eg. maybe + # we have to do the batch queries on the `asyncio` side? + yield get_hist, {'erlangs': 1, 'rate': 6} + + +_pacing: str = ( + 'Historical Market Data Service error ' + 'message:Historical data request pacing violation' +) + + +async def get_bars( + + proxy: MethodProxy, + fqsn: str, + + # blank to start which tells ib to look up the latest datum + end_dt: str = '', + +) -> (dict, np.ndarray): + ''' + Retrieve historical data from a ``trio``-side task using + a ``MethoProxy``. + + ''' + fails = 0 + bars: Optional[list] = None + first_dt: datetime = None + last_dt: datetime = None + + if end_dt: + last_dt = pendulum.from_timestamp(end_dt.timestamp()) + + for _ in range(10): + try: + out = await proxy.bars( + fqsn=fqsn, + end_dt=end_dt, + ) + if out: + bars, bars_array = out + + else: + await tractor.breakpoint() + + if bars_array is None: + raise SymbolNotFound(fqsn) + + first_dt = pendulum.from_timestamp( + bars[0].date.timestamp()) + + last_dt = pendulum.from_timestamp( + bars[-1].date.timestamp()) + + time = bars_array['time'] + assert time[-1] == last_dt.timestamp() + assert time[0] == first_dt.timestamp() + log.info( + f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' + ) + + return (bars, bars_array, first_dt, last_dt), fails + + except RequestError as err: + msg = err.message + # why do we always need to rebind this? + # _err = err + + if 'No market data permissions for' in msg: + # TODO: signalling for no permissions searches + raise NoData( + f'Symbol: {fqsn}', + ) + + elif ( + err.code == 162 + and 'HMDS query returned no data' in err.message + ): + # XXX: this is now done in the storage mgmt layer + # and we shouldn't implicitly decrement the frame dt + # index since the upper layer may be doing so + # concurrently and we don't want to be delivering frames + # that weren't asked for. + log.warning( + f'NO DATA found ending @ {end_dt}\n' + ) + + # try to decrement start point and look further back + # end_dt = last_dt = last_dt.subtract(seconds=2000) + + raise NoData( + f'Symbol: {fqsn}', + frame_size=2000, + ) + + elif _pacing in msg: + + log.warning( + 'History throttle rate reached!\n' + 'Resetting farms with `ctrl-alt-f` hack\n' + ) + # TODO: we might have to put a task lock around this + # method.. + hist_ev = proxy.status_event( + 'HMDS data farm connection is OK:ushmds' + ) + + # XXX: other event messages we might want to try and + # wait for but i wasn't able to get any of this + # reliable.. + # reconnect_start = proxy.status_event( + # 'Market data farm is connecting:usfuture' + # ) + # live_ev = proxy.status_event( + # 'Market data farm connection is OK:usfuture' + # ) + + # try to wait on the reset event(s) to arrive, a timeout + # will trigger a retry up to 6 times (for now). + tries: int = 2 + timeout: float = 10 + + # try 3 time with a data reset then fail over to + # a connection reset. + for i in range(1, tries): + + log.warning('Sending DATA RESET request') + await data_reset_hack(reset_type='data') + + with trio.move_on_after(timeout) as cs: + for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + break + + if cs.cancelled_caught: + fails += 1 + log.warning( + f'Data reset {name} timeout, retrying {i}.' + ) + + continue + else: + + log.warning('Sending CONNECTION RESET') + await data_reset_hack(reset_type='connection') + + with trio.move_on_after(timeout) as cs: + for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + + if cs.cancelled_caught: + fails += 1 + log.warning('Data CONNECTION RESET timeout!?') + + else: + raise + + return None, None + # else: # throttle wasn't fixed so error out immediately + # raise _err + + +async def backfill_bars( + + fqsn: str, + shm: ShmArray, # type: ignore # noqa + + # TODO: we want to avoid overrunning the underlying shm array buffer + # and we should probably calc the number of calls to make depending + # on that until we have the `marketstore` daemon in place in which + # case the shm size will be driven by user config and available sys + # memory. + count: int = 16, + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Fill historical bars into shared mem / storage afap. + + TODO: avoid pacing constraints: + https://github.com/pikers/piker/issues/128 + + ''' + # last_dt1 = None + last_dt = None + + with trio.CancelScope() as cs: + + async with open_data_client() as proxy: + + out, fails = await get_bars(proxy, fqsn) + + if out is None: + raise RuntimeError("Could not pull currrent history?!") + + (first_bars, bars_array, first_dt, last_dt) = out + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + last_dt = first_dt + + # write historical data to buffer + shm.push(bars_array) + + task_status.started(cs) + + i = 0 + while i < count: + + out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) + + if out is None: + # could be trying to retreive bars over weekend + # TODO: add logic here to handle tradable hours and + # only grab valid bars in the range + log.error(f"Can't grab bars starting at {first_dt}!?!?") + + # XXX: get_bars() should internally decrement dt by + # 2k seconds and try again. + continue + + (first_bars, bars_array, first_dt, last_dt) = out + # last_dt1 = last_dt + # last_dt = first_dt + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + # TODO we should probably dig into forums to see what peeps + # think this data "means" and then use it as an indicator of + # sorts? dinkus has mentioned that $vlms for the day dont' + # match other platforms nor the summary stat tws shows in + # the monitor - it's probably worth investigating. + + shm.push(bars_array, prepend=True) + i += 1 + + +asset_type_map = { + 'STK': 'stock', + 'OPT': 'option', + 'FUT': 'future', + 'CONTFUT': 'continuous_future', + 'CASH': 'forex', + 'IND': 'index', + 'CFD': 'cfd', + 'BOND': 'bond', + 'CMDTY': 'commodity', + 'FOP': 'futures_option', + 'FUND': 'mutual_fund', + 'WAR': 'warrant', + 'IOPT': 'warran', + 'BAG': 'bag', + # 'NEWS': 'news', +} + + +_quote_streams: dict[str, trio.abc.ReceiveStream] = {} + + +async def _setup_quote_stream( + + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + + symbol: str, + opts: tuple[int] = ( + '375', # RT trade volume (excludes utrades) + '233', # RT trade volume (includes utrades) + '236', # Shortable shares + + # these all appear to only be updated every 25s thus + # making them mostly useless and explains why the scanner + # is always slow XD + # '293', # Trade count for day + '294', # Trade rate / minute + '295', # Vlm rate / minute + ), + contract: Optional[Contract] = None, + +) -> trio.abc.ReceiveChannel: + ''' + Stream a ticker using the std L1 api. + + This task is ``asyncio``-side and must be called from + ``tractor.to_asyncio.open_channel_from()``. + + ''' + global _quote_streams + + to_trio.send_nowait(None) + + async with load_aio_clients() as accts2clients: + caccount_name, client = get_preferred_data_client(accts2clients) + contract = contract or (await client.find_contract(symbol)) + ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + + # NOTE: it's batch-wise and slow af but I guess could + # be good for backchecking? Seems to be every 5s maybe? + # ticker: Ticker = client.ib.reqTickByTickData( + # contract, 'Last', + # ) + + # # define a simple queue push routine that streams quote packets + # # to trio over the ``to_trio`` memory channel. + # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + def teardown(): + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + client.ib.cancelMktData(contract) + + # decouple broadcast mem chan + _quote_streams.pop(symbol, None) + + def push(t: Ticker) -> None: + """ + Push quotes to trio task. + + """ + # log.debug(t) + try: + to_trio.send_nowait(t) + + except ( + trio.BrokenResourceError, + + # XXX: HACK, not sure why this gets left stale (probably + # due to our terrible ``tractor.to_asyncio`` + # implementation for streams.. but if the mem chan + # gets left here and starts blocking just kill the feed? + # trio.WouldBlock, + ): + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + teardown() + except trio.WouldBlock: + log.warning( + f'channel is blocking symbol feed for {symbol}?' + f'\n{to_trio.statistics}' + ) + + # except trio.WouldBlock: + # # for slow debugging purposes to avoid clobbering prompt + # # with log msgs + # pass + + ticker.updateEvent.connect(push) + try: + await asyncio.sleep(float('inf')) + finally: + teardown() + + # return from_aio + + +@acm +async def open_aio_quote_stream( + + symbol: str, + contract: Optional[Contract] = None, + +) -> trio.abc.ReceiveStream: + + from tractor.trionics import broadcast_receiver + global _quote_streams + + from_aio = _quote_streams.get(symbol) + if from_aio: + + # if we already have a cached feed deliver a rx side clone to consumer + async with broadcast_receiver( + from_aio, + 2**6, + ) as from_aio: + yield from_aio + return + + async with tractor.to_asyncio.open_channel_from( + _setup_quote_stream, + symbol=symbol, + contract=contract, + + ) as (first, from_aio): + + # cache feed for later consumers + _quote_streams[symbol] = from_aio + + yield from_aio + + +# TODO: cython/mypyc/numba this! +def normalize( + ticker: Ticker, + calc_price: bool = False + +) -> dict: + + # should be real volume for this contract by default + calc_price = False + + # check for special contract types + con = ticker.contract + if type(con) in ( + ibis.Commodity, + ibis.Forex, + ): + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = con.secType + # no real volume on this tract + calc_price = True + + else: + suffix = con.primaryExchange + if not suffix: + suffix = con.exchange + + # append a `.` to the returned symbol + # key for derivatives that normally is the expiry + # date key. + expiry = con.lastTradeDateOrContractMonth + if expiry: + suffix += f'.{expiry}' + + # convert named tuples to dicts so we send usable keys + new_ticks = [] + for tick in ticker.ticks: + if tick and not isinstance(tick, dict): + td = tick._asdict() + td['type'] = tick_types.get( + td['tickType'], + 'n/a', + ) + + new_ticks.append(td) + + tbt = ticker.tickByTicks + if tbt: + print(f'tickbyticks:\n {ticker.tickByTicks}') + + ticker.ticks = new_ticks + + # some contracts don't have volume so we may want to calculate + # a midpoint price based on data we can acquire (such as bid / ask) + if calc_price: + ticker.ticks.append( + {'type': 'trade', 'price': ticker.marketPrice()} + ) + + # serialize for transport + data = asdict(ticker) + + # generate fqsn with possible specialized suffix + # for derivatives, note the lowercase. + data['symbol'] = data['fqsn'] = '.'.join( + (con.symbol, suffix) + ).lower() + + # convert named tuples to dicts for transport + tbts = data.get('tickByTicks') + if tbts: + data['tickByTicks'] = [tbt._asdict() for tbt in tbts] + + # add time stamps for downstream latency measurements + data['brokerd_ts'] = time.time() + + # stupid stupid shit...don't even care any more.. + # leave it until we do a proper latency study + # if ticker.rtTime is not None: + # data['broker_ts'] = data['rtTime_s'] = float( + # ticker.rtTime.timestamp) / 1000. + data.pop('rtTime') + + return data + + +async def stream_quotes( + + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Stream symbol quotes. + + This is a ``trio`` callable routine meant to be invoked + once the brokerd is up. + + ''' + # TODO: support multiple subscriptions + sym = symbols[0] + log.info(f'request for real-time quotes: {sym}') + + async with open_data_client() as proxy: + + con, first_ticker, details = await proxy.get_sym_details(symbol=sym) + first_quote = normalize(first_ticker) + # print(f'first quote: {first_quote}') + + def mk_init_msgs() -> dict[str, dict]: + ''' + Collect a bunch of meta-data useful for feed startup and + pack in a `dict`-msg. + + ''' + # pass back some symbol info like min_tick, trading_hours, etc. + syminfo = asdict(details) + syminfo.update(syminfo['contract']) + + # nested dataclass we probably don't need and that won't IPC + # serialize + syminfo.pop('secIdList') + + # TODO: more consistent field translation + atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] + + # for stocks it seems TWS reports too small a tick size + # such that you can't submit orders with that granularity? + min_tick = 0.01 if atype == 'stock' else 0 + + syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) + + # for "traditional" assets, volume is normally discreet, not + # a float + syminfo['lot_tick_size'] = 0.0 + + ibclient = proxy._aio_ns.ib.client + host, port = ibclient.host, ibclient.port + + # TODO: for loop through all symbols passed in + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': syminfo, + 'fqsn': first_quote['fqsn'], + }, + 'status': { + 'data_ep': f'{host}:{port}', + }, + + } + return init_msgs + + init_msgs = mk_init_msgs() + + # TODO: we should instead spawn a task that waits on a feed to start + # and let it wait indefinitely..instead of this hard coded stuff. + with trio.move_on_after(1): + contract, first_ticker, details = await proxy.get_quote(symbol=sym) + + # it might be outside regular trading hours so see if we can at + # least grab history. + if isnan(first_ticker.last): + task_status.started((init_msgs, first_quote)) + + # it's not really live but this will unblock + # the brokerd feed task to tell the ui to update? + feed_is_live.set() + + # block and let data history backfill code run. + await trio.sleep_forever() + return # we never expect feed to come up? + + async with open_aio_quote_stream( + symbol=sym, + contract=con, + ) as stream: + + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + + task_status.started((init_msgs, first_quote)) + + async with aclosing(stream): + if type(first_ticker.contract) not in ( + ibis.Commodity, + ibis.Forex + ): + # wait for real volume on feed (trading might be closed) + while True: + ticker = await stream.receive() + + # for a real volume contract we rait for the first + # "real" trade to take place + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): + # spin consuming tickers until we get a real + # market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) + ticker.ticks = [] + + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + + # tell caller quotes are now coming in live + feed_is_live.set() + + # last = time.time() + async for ticker in stream: + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + # last = time.time() + + +async def data_reset_hack( + reset_type: str = 'data', + +) -> None: + ''' + Run key combos for resetting data feeds and yield back to caller + when complete. + + This is a linux-only hack around: + + https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations + + TODOs: + - a return type that hopefully determines if the hack was + successful. + - other OS support? + - integration with ``ib-gw`` run in docker + Xorg? + + ''' + + async def vnc_click_hack( + reset_type: str = 'data' + ) -> None: + ''' + Reset the data or netowork connection for the VNC attached + ib gateway using magic combos. + + ''' + key = {'data': 'f', 'connection': 'r'}[reset_type] + + import asyncvnc + + async with asyncvnc.connect( + 'localhost', + port=3003, + # password='ibcansmbz', + ) as client: + + # move to middle of screen + # 640x1800 + client.mouse.move( + x=500, + y=500, + ) + client.mouse.click() + client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked + + await tractor.to_asyncio.run_task(vnc_click_hack) + + # we don't really need the ``xdotool`` approach any more B) + return True + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, + +) -> None: + + # TODO: load user defined symbol set locally for fast search? + await ctx.started({}) + + async with open_data_client() as proxy: + async with ctx.open_stream() as stream: + + last = time.time() + + async for pattern in stream: + log.debug(f'received {pattern}') + now = time.time() + + assert pattern, 'IB can not accept blank search pattern' + + # throttle search requests to no faster then 1Hz + diff = now - last + if diff < 1.0: + log.debug('throttle sleeping') + await trio.sleep(diff) + try: + pattern = stream.receive_nowait() + except trio.WouldBlock: + pass + + if not pattern or pattern.isspace(): + log.warning('empty pattern received, skipping..') + + # TODO: *BUG* if nothing is returned here the client + # side will cache a null set result and not showing + # anything to the use on re-searches when this query + # timed out. We probably need a special "timeout" msg + # or something... + + # XXX: this unblocks the far end search task which may + # hold up a multi-search nursery block + await stream.send({}) + + continue + + log.debug(f'searching for {pattern}') + + last = time.time() + + # async batch search using api stocks endpoint and module + # defined adhoc symbol set. + stock_results = [] + + async def stash_results(target: Awaitable[list]): + stock_results.extend(await target) + + async with trio.open_nursery() as sn: + sn.start_soon( + stash_results, + proxy.search_symbols( + pattern=pattern, + upto=5, + ), + ) + + # trigger async request + await trio.sleep(0) + + # match against our ad-hoc set immediately + adhoc_matches = fuzzy.extractBests( + pattern, + list(_adhoc_futes_set), + score_cutoff=90, + ) + log.info(f'fuzzy matched adhocs: {adhoc_matches}') + adhoc_match_results = {} + if adhoc_matches: + # TODO: do we need to pull contract details? + adhoc_match_results = {i[0]: {} for i in adhoc_matches} + + log.debug(f'fuzzy matching stocks {stock_results}') + stock_matches = fuzzy.extractBests( + pattern, + stock_results, + score_cutoff=50, + ) + + matches = adhoc_match_results | { + item[0]: {} for item in stock_matches + } + # TODO: we used to deliver contract details + # {item[2]: item[0] for item in stock_matches} + + log.debug(f"sending matches: {matches.keys()}") + await stream.send(matches) diff --git a/piker/data/feed.py b/piker/data/feed.py index 1165fddc..66ced72f 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -700,6 +700,7 @@ async def manage_history( bfqsn = fqsn.replace('.' + mod.name, '') open_history_client = getattr(mod, 'open_history_client', None) + assert open_history_client if is_up and opened and open_history_client: From 85c2f6e79fa77d1faae05a9f9381b12998f150bd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Jun 2022 15:48:39 -0400 Subject: [PATCH 3/6] Factor trades endpoint into `.ib.broker.py` --- piker/brokers/ib/__init__.py | 3 +- piker/brokers/ib/broker.py | 593 +++++++++++++++++++++++++++++++++++ piker/brokers/ib/client.py | 543 +------------------------------- 3 files changed, 598 insertions(+), 541 deletions(-) create mode 100644 piker/brokers/ib/broker.py diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 1d6bac33..2d9c198e 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -32,13 +32,13 @@ Sub-modules within break into the core functionalities: """ from .client import ( get_client, - trades_dialogue, ) from .feed import ( open_history_client, open_symbol_search, stream_quotes, ) +from .broker import trades_dialogue __all__ = [ 'get_client', @@ -53,6 +53,7 @@ __all__ = [ __enable_modules__: list[str] = [ 'client', 'feed', + 'broker', ] # passed to ``tractor.ActorNursery.start_actor()`` diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py new file mode 100644 index 00000000..6bd35b37 --- /dev/null +++ b/piker/brokers/ib/broker.py @@ -0,0 +1,593 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +""" +Order and trades endpoints for use with ``piker``'s EMS. + +""" +from __future__ import annotations +from dataclasses import asdict +from functools import partial +from pprint import pformat +import time +from typing import ( + Any, + Optional, + AsyncIterator, +) + +import trio +from trio_typing import TaskStatus +import tractor +from ib_insync.contract import ( + Contract, + Option, +) +from ib_insync.order import ( + Trade, + OrderStatus, +) +from ib_insync.objects import ( + Fill, + Execution, +) +from ib_insync.objects import Position + +from piker import config +from piker.log import get_console_log +from piker.clearing._messages import ( + BrokerdOrder, + BrokerdOrderAck, + BrokerdStatus, + BrokerdPosition, + BrokerdCancel, + BrokerdFill, + BrokerdError, +) +from .client import ( + _adhoc_futes_set, + log, + get_config, + open_client_proxies, + Client, +) + + +def pack_position( + pos: Position + +) -> dict[str, Any]: + con = pos.contract + + if isinstance(con, Option): + # TODO: option symbol parsing and sane display: + symbol = con.localSymbol.replace(' ', '') + + else: + # TODO: lookup fqsn even for derivs. + symbol = con.symbol.lower() + + exch = (con.primaryExchange or con.exchange).lower() + symkey = '.'.join((symbol, exch)) + if not exch: + # attempt to lookup the symbol from our + # hacked set.. + for sym in _adhoc_futes_set: + if symbol in sym: + symkey = sym + break + + expiry = con.lastTradeDateOrContractMonth + if expiry: + symkey += f'.{expiry}' + + # TODO: options contracts into a sane format.. + + return BrokerdPosition( + broker='ib', + account=pos.account, + symbol=symkey, + currency=con.currency, + size=float(pos.position), + avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + ) + + +async def handle_order_requests( + + ems_order_stream: tractor.MsgStream, + accounts_def: dict[str, str], + +) -> None: + + global _accounts2clients + + request_msg: dict + async for request_msg in ems_order_stream: + log.info(f'Received order request {request_msg}') + + action = request_msg['action'] + account = request_msg['account'] + + acct_number = accounts_def.get(account) + if not acct_number: + log.error( + f'An IB account number for name {account} is not found?\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No account found: `{account}` ?', + ).dict()) + continue + + client = _accounts2clients.get(account) + if not client: + log.error( + f'An IB client for account name {account} is not found.\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No api client loaded for account: `{account}` ?', + ).dict()) + continue + + if action in {'buy', 'sell'}: + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + reqid = client.submit_limit( + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + account=acct_number, + + # XXX: by default 0 tells ``ib_insync`` methods that + # there is no existing order so ask the client to create + # a new one (which it seems to do by allocating an int + # counter - collision prone..) + reqid=order.reqid, + ) + if reqid is None: + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason='Order already active?', + ).dict()) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + # ems order request id + oid=order.oid, + # broker specific request id + reqid=reqid, + time_ns=time.time_ns(), + account=account, + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + client.submit_cancel(reqid=msg.reqid) + + else: + log.error(f'Unknown order command: {request_msg}') + + +async def recv_trade_updates( + + client: Client, + to_trio: trio.abc.SendChannel, + +) -> None: + """Stream a ticker using the std L1 api. + """ + client.inline_errors(to_trio) + + # sync with trio task + to_trio.send_nowait(None) + + def push_tradesies(eventkit_obj, obj, fill=None): + """Push events to trio task. + + """ + if fill is not None: + # execution details event + item = ('fill', (obj, fill)) + + elif eventkit_obj.name() == 'positionEvent': + item = ('position', obj) + + else: + item = ('status', obj) + + log.info(f'eventkit event ->\n{pformat(item)}') + + try: + to_trio.send_nowait(item) + except trio.BrokenResourceError: + log.exception(f'Disconnected from {eventkit_obj} updates') + eventkit_obj.disconnect(push_tradesies) + + # hook up to the weird eventkit object - event stream api + for ev_name in [ + 'orderStatusEvent', # all order updates + 'execDetailsEvent', # all "fill" updates + 'positionEvent', # avg price updates per symbol per account + + # 'commissionReportEvent', + # XXX: ugh, it is a separate event from IB and it's + # emitted as follows: + # self.ib.commissionReportEvent.emit(trade, fill, report) + + # XXX: not sure yet if we need these + # 'updatePortfolioEvent', + + # XXX: these all seem to be weird ib_insync intrernal + # events that we probably don't care that much about + # given the internal design is wonky af.. + # 'newOrderEvent', + # 'orderModifyEvent', + # 'cancelOrderEvent', + # 'openOrderEvent', + ]: + eventkit_obj = getattr(client.ib, ev_name) + handler = partial(push_tradesies, eventkit_obj) + eventkit_obj.connect(handler) + + # let the engine run and stream + await client.ib.disconnectedEvent + + +@tractor.context +async def trades_dialogue( + + ctx: tractor.Context, + loglevel: str = None, + +) -> AsyncIterator[dict[str, Any]]: + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + accounts_def = config.load_accounts(['ib']) + + global _accounts2clients + global _client_cache + + # deliver positions to subscriber before anything else + all_positions = [] + accounts = set() + clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] + + async with ( + trio.open_nursery() as nurse, + open_client_proxies() as (proxies, aioclients), + ): + # for account, client in _accounts2clients.items(): + for account, proxy in proxies.items(): + + client = aioclients[account] + + async def open_stream( + task_status: TaskStatus[ + trio.abc.ReceiveChannel + ] = trio.TASK_STATUS_IGNORED, + ): + # each api client has a unique event stream + async with tractor.to_asyncio.open_channel_from( + recv_trade_updates, + client=client, + ) as (first, trade_event_stream): + + task_status.started(trade_event_stream) + await trio.sleep_forever() + + trade_event_stream = await nurse.start(open_stream) + + clients.append((client, trade_event_stream)) + + assert account in accounts_def + accounts.add(account) + + for client in aioclients.values(): + for pos in client.positions(): + + msg = pack_position(pos) + msg.account = accounts_def.inverse[msg.account] + + assert msg.account in accounts, ( + f'Position for unknown account: {msg.account}') + + all_positions.append(msg.dict()) + + trades: list[dict] = [] + for proxy in proxies.values(): + trades.append(await proxy.trades()) + + log.info(f'Loaded {len(trades)} from this session') + # TODO: write trades to local ``trades.toml`` + # - use above per-session trades data and write to local file + # - get the "flex reports" working and pull historical data and + # also save locally. + + await ctx.started(( + all_positions, + tuple(name for name in accounts_def if name in accounts), + )) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + # start order request handler **before** local trades event loop + n.start_soon(handle_order_requests, ems_stream, accounts_def) + + # allocate event relay tasks for each client connection + for client, stream in clients: + n.start_soon( + deliver_trade_events, + stream, + ems_stream, + accounts_def + ) + + # block until cancelled + await trio.sleep_forever() + + +async def deliver_trade_events( + + trade_event_stream: trio.MemoryReceiveChannel, + ems_stream: tractor.MsgStream, + accounts_def: dict[str, str], + +) -> None: + '''Format and relay all trade events for a given client to the EMS. + + ''' + action_map = {'BOT': 'buy', 'SLD': 'sell'} + + # TODO: for some reason we can receive a ``None`` here when the + # ib-gw goes down? Not sure exactly how that's happening looking + # at the eventkit code above but we should probably handle it... + async for event_name, item in trade_event_stream: + + log.info(f'ib sending {event_name}:\n{pformat(item)}') + + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) + + # XXX: here's some other sucky cases from the api + # - short-sale but securities haven't been located, in this + # case we should probably keep the order in some kind of + # weird state or cancel it outright? + + # status='PendingSubmit', message=''), + # status='Cancelled', message='Error 404, + # reqId 1550: Order held while securities are located.'), + # status='PreSubmitted', message='')], + + if event_name == 'status': + + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... + + # unwrap needed data from ib_insync internal types + trade: Trade = item + status: OrderStatus = trade.orderStatus + + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = BrokerdStatus( + + reqid=trade.order.orderId, + time_ns=time.time_ns(), # cuz why not + account=accounts_def.inverse[trade.order.account], + + # everyone doin camel case.. + status=status.status.lower(), # force lower case + + filled=status.filled, + reason=status.whyHeld, + + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=status.remaining, + + broker_details={'name': 'ib'}, + ) + + elif event_name == 'fill': + + # for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. + + # unpack ib_insync types + # pep-0526 style: + # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + trade: Trade + fill: Fill + trade, fill = item + execu: Execution = fill.execution + + # TODO: normalize out commissions details? + details = { + 'contract': asdict(fill.contract), + 'execution': asdict(fill.execution), + 'commissions': asdict(fill.commissionReport), + 'broker_time': execu.time, # supposedly server fill time + 'name': 'ib', + } + + msg = BrokerdFill( + # should match the value returned from `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not + + action=action_map[execu.side], + size=execu.shares, + price=execu.price, + + broker_details=details, + # XXX: required by order mode currently + broker_time=details['broker_time'], + + ) + + elif event_name == 'error': + + err: dict = item + + # f$#$% gawd dammit insync.. + con = err['contract'] + if isinstance(con, Contract): + err['contract'] = asdict(con) + + if err['reqid'] == -1: + log.error(f'TWS external order error:\n{pformat(err)}') + + # TODO: what schema for this msg if we're going to make it + # portable across all backends? + # msg = BrokerdError(**err) + continue + + elif event_name == 'position': + msg = pack_position(item) + msg.account = accounts_def.inverse[msg.account] + + elif event_name == 'event': + + # it's either a general system status event or an external + # trade event? + log.info(f"TWS system status: \n{pformat(item)}") + + # TODO: support this again but needs parsing at the callback + # level... + # reqid = item.get('reqid', 0) + # if getattr(msg, 'reqid', 0) < -1: + # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + continue + + # msg.reqid = 'tws-' + str(-1 * reqid) + + # mark msg as from "external system" + # TODO: probably something better then this.. and start + # considering multiplayer/group trades tracking + # msg.broker_details['external_src'] = 'tws' + + # XXX: we always serialize to a dict for msgpack + # translations, ideally we can move to an msgspec (or other) + # encoder # that can be enabled in ``tractor`` ahead of + # time so we can pass through the message types directly. + await ems_stream.send(msg.dict()) + + +def load_flex_trades( + path: Optional[str] = None, + +) -> dict[str, str]: + + from pprint import pprint + from ib_insync import flexreport, util + + conf = get_config() + + if not path: + # load ``brokers.toml`` and try to get the flex + # token and query id that must be previously defined + # by the user. + token = conf.get('flex_token') + if not token: + raise ValueError( + 'You must specify a ``flex_token`` field in your' + '`brokers.toml` in order load your trade log, see our' + 'intructions for how to set this up here:\n' + 'PUT LINK HERE!' + ) + + qid = conf['flex_trades_query_id'] + + # TODO: hack this into our logging + # system like we do with the API client.. + util.logToConsole() + + # TODO: rewrite the query part of this with async..httpx? + report = flexreport.FlexReport( + token=token, + queryId=qid, + ) + + else: + # XXX: another project we could potentially look at, + # https://pypi.org/project/ibflex/ + report = flexreport.FlexReport(path=path) + + trade_entries = report.extract('Trade') + trades = { + # XXX: LOL apparently ``toml`` has a bug + # where a section key error will show up in the write + # if you leave this as an ``int``? + str(t.__dict__['tradeID']): t.__dict__ + for t in trade_entries + } + + ln = len(trades) + log.info(f'Loaded {ln} trades from flex query') + + trades_by_account = {} + for tid, trade in trades.items(): + trades_by_account.setdefault( + # oddly for some so-called "BookTrade" entries + # this field seems to be blank, no cuckin clue. + # trade['ibExecID'] + str(trade['accountId']), {} + )[tid] = trade + + section = {'ib': trades_by_account} + pprint(section) + + # TODO: load the config first and append in + # the new trades loaded here.. + try: + config.write(section, 'trades') + except KeyError: + import pdbpp; pdbpp.set_trace() # noqa + + +if __name__ == '__main__': + load_flex_trades() diff --git a/piker/brokers/ib/client.py b/piker/brokers/ib/client.py index 43626c04..5405ab27 100644 --- a/piker/brokers/ib/client.py +++ b/piker/brokers/ib/client.py @@ -29,8 +29,6 @@ import itertools from math import isnan from typing import ( Any, - Optional, - AsyncIterator, Union, ) import asyncio @@ -41,13 +39,11 @@ from types import SimpleNamespace import trio -from trio_typing import TaskStatus import tractor from tractor import to_asyncio from ib_insync.wrapper import RequestError -from ib_insync.contract import Contract, ContractDetails, Option -from ib_insync.order import Order, Trade, OrderStatus -from ib_insync.objects import Fill, Execution +from ib_insync.contract import Contract, ContractDetails +from ib_insync.order import Order from ib_insync.ticker import Ticker from ib_insync.objects import Position import ib_insync as ibis @@ -55,15 +51,9 @@ from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client import numpy as np - from piker import config -from piker.log import get_logger, get_console_log +from piker.log import get_logger from piker.data._source import base_ohlc_dtype -from piker.clearing._messages import ( - BrokerdOrder, BrokerdOrderAck, BrokerdStatus, - BrokerdPosition, BrokerdCancel, - BrokerdFill, BrokerdError, -) log = get_logger(__name__) @@ -809,70 +799,6 @@ class Client: return self.ib.positions(account=account) -async def recv_trade_updates( - - client: Client, - to_trio: trio.abc.SendChannel, - -) -> None: - """Stream a ticker using the std L1 api. - """ - client.inline_errors(to_trio) - - # sync with trio task - to_trio.send_nowait(None) - - def push_tradesies(eventkit_obj, obj, fill=None): - """Push events to trio task. - - """ - if fill is not None: - # execution details event - item = ('fill', (obj, fill)) - - elif eventkit_obj.name() == 'positionEvent': - item = ('position', obj) - - else: - item = ('status', obj) - - log.info(f'eventkit event ->\n{pformat(item)}') - - try: - to_trio.send_nowait(item) - except trio.BrokenResourceError: - log.exception(f'Disconnected from {eventkit_obj} updates') - eventkit_obj.disconnect(push_tradesies) - - # hook up to the weird eventkit object - event stream api - for ev_name in [ - 'orderStatusEvent', # all order updates - 'execDetailsEvent', # all "fill" updates - 'positionEvent', # avg price updates per symbol per account - - # 'commissionReportEvent', - # XXX: ugh, it is a separate event from IB and it's - # emitted as follows: - # self.ib.commissionReportEvent.emit(trade, fill, report) - - # XXX: not sure yet if we need these - # 'updatePortfolioEvent', - - # XXX: these all seem to be weird ib_insync intrernal - # events that we probably don't care that much about - # given the internal design is wonky af.. - # 'newOrderEvent', - # 'orderModifyEvent', - # 'cancelOrderEvent', - # 'openOrderEvent', - ]: - eventkit_obj = getattr(client.ib, ev_name) - handler = partial(push_tradesies, eventkit_obj) - eventkit_obj.connect(handler) - - # let the engine run and stream - await client.ib.disconnectedEvent - # per-actor API ep caching _client_cache: dict[tuple[str, int], Client] = {} _scan_ignore: set[tuple[str, int]] = set() @@ -1336,466 +1262,3 @@ async def get_client( # actor isn't in aio mode. async with open_data_client() as proxy: yield proxy - - -def pack_position( - pos: Position - -) -> dict[str, Any]: - con = pos.contract - - if isinstance(con, Option): - # TODO: option symbol parsing and sane display: - symbol = con.localSymbol.replace(' ', '') - - else: - # TODO: lookup fqsn even for derivs. - symbol = con.symbol.lower() - - exch = (con.primaryExchange or con.exchange).lower() - symkey = '.'.join((symbol, exch)) - if not exch: - # attempt to lookup the symbol from our - # hacked set.. - for sym in _adhoc_futes_set: - if symbol in sym: - symkey = sym - break - - expiry = con.lastTradeDateOrContractMonth - if expiry: - symkey += f'.{expiry}' - - # TODO: options contracts into a sane format.. - - return BrokerdPosition( - broker='ib', - account=pos.account, - symbol=symkey, - currency=con.currency, - size=float(pos.position), - avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), - ) - - -async def handle_order_requests( - - ems_order_stream: tractor.MsgStream, - accounts_def: dict[str, str], - -) -> None: - - global _accounts2clients - - request_msg: dict - async for request_msg in ems_order_stream: - log.info(f'Received order request {request_msg}') - - action = request_msg['action'] - account = request_msg['account'] - - acct_number = accounts_def.get(account) - if not acct_number: - log.error( - f'An IB account number for name {account} is not found?\n' - 'Make sure you have all TWS and GW instances running.' - ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=f'No account found: `{account}` ?', - ).dict()) - continue - - client = _accounts2clients.get(account) - if not client: - log.error( - f'An IB client for account name {account} is not found.\n' - 'Make sure you have all TWS and GW instances running.' - ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=f'No api client loaded for account: `{account}` ?', - ).dict()) - continue - - if action in {'buy', 'sell'}: - # validate - order = BrokerdOrder(**request_msg) - - # call our client api to submit the order - reqid = client.submit_limit( - oid=order.oid, - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - account=acct_number, - - # XXX: by default 0 tells ``ib_insync`` methods that - # there is no existing order so ask the client to create - # a new one (which it seems to do by allocating an int - # counter - collision prone..) - reqid=order.reqid, - ) - if reqid is None: - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason='Order already active?', - ).dict()) - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - # ems order request id - oid=order.oid, - # broker specific request id - reqid=reqid, - time_ns=time.time_ns(), - account=account, - ).dict() - ) - - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) - client.submit_cancel(reqid=msg.reqid) - - else: - log.error(f'Unknown order command: {request_msg}') - - -@tractor.context -async def trades_dialogue( - - ctx: tractor.Context, - loglevel: str = None, - -) -> AsyncIterator[dict[str, Any]]: - - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - accounts_def = config.load_accounts(['ib']) - - global _accounts2clients - global _client_cache - - # deliver positions to subscriber before anything else - all_positions = [] - accounts = set() - clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] - - async with ( - trio.open_nursery() as nurse, - open_client_proxies() as (proxies, aioclients), - ): - # for account, client in _accounts2clients.items(): - for account, proxy in proxies.items(): - - client = aioclients[account] - - async def open_stream( - task_status: TaskStatus[ - trio.abc.ReceiveChannel - ] = trio.TASK_STATUS_IGNORED, - ): - # each api client has a unique event stream - async with tractor.to_asyncio.open_channel_from( - recv_trade_updates, - client=client, - ) as (first, trade_event_stream): - - task_status.started(trade_event_stream) - await trio.sleep_forever() - - trade_event_stream = await nurse.start(open_stream) - - clients.append((client, trade_event_stream)) - - assert account in accounts_def - accounts.add(account) - - for client in aioclients.values(): - for pos in client.positions(): - - msg = pack_position(pos) - msg.account = accounts_def.inverse[msg.account] - - assert msg.account in accounts, ( - f'Position for unknown account: {msg.account}') - - all_positions.append(msg.dict()) - - trades: list[dict] = [] - for proxy in proxies.values(): - trades.append(await proxy.trades()) - - log.info(f'Loaded {len(trades)} from this session') - # TODO: write trades to local ``trades.toml`` - # - use above per-session trades data and write to local file - # - get the "flex reports" working and pull historical data and - # also save locally. - - await ctx.started(( - all_positions, - tuple(name for name in accounts_def if name in accounts), - )) - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - # start order request handler **before** local trades event loop - n.start_soon(handle_order_requests, ems_stream, accounts_def) - - # allocate event relay tasks for each client connection - for client, stream in clients: - n.start_soon( - deliver_trade_events, - stream, - ems_stream, - accounts_def - ) - - # block until cancelled - await trio.sleep_forever() - - -async def deliver_trade_events( - - trade_event_stream: trio.MemoryReceiveChannel, - ems_stream: tractor.MsgStream, - accounts_def: dict[str, str], - -) -> None: - '''Format and relay all trade events for a given client to the EMS. - - ''' - action_map = {'BOT': 'buy', 'SLD': 'sell'} - - # TODO: for some reason we can receive a ``None`` here when the - # ib-gw goes down? Not sure exactly how that's happening looking - # at the eventkit code above but we should probably handle it... - async for event_name, item in trade_event_stream: - - log.info(f'ib sending {event_name}:\n{pformat(item)}') - - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # XXX: here's some other sucky cases from the api - # - short-sale but securities haven't been located, in this - # case we should probably keep the order in some kind of - # weird state or cancel it outright? - - # status='PendingSubmit', message=''), - # status='Cancelled', message='Error 404, - # reqId 1550: Order held while securities are located.'), - # status='PreSubmitted', message='')], - - if event_name == 'status': - - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... - - # unwrap needed data from ib_insync internal types - trade: Trade = item - status: OrderStatus = trade.orderStatus - - # skip duplicate filled updates - we get the deats - # from the execution details event - msg = BrokerdStatus( - - reqid=trade.order.orderId, - time_ns=time.time_ns(), # cuz why not - account=accounts_def.inverse[trade.order.account], - - # everyone doin camel case.. - status=status.status.lower(), # force lower case - - filled=status.filled, - reason=status.whyHeld, - - # this seems to not be necessarily up to date in the - # execDetails event.. so we have to send it here I guess? - remaining=status.remaining, - - broker_details={'name': 'ib'}, - ) - - elif event_name == 'fill': - - # for wtv reason this is a separate event type - # from IB, not sure why it's needed other then for extra - # complexity and over-engineering :eyeroll:. - # we may just end up dropping these events (or - # translating them to ``Status`` msgs) if we can - # show the equivalent status events are no more latent. - - # unpack ib_insync types - # pep-0526 style: - # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations - trade: Trade - fill: Fill - trade, fill = item - execu: Execution = fill.execution - - # TODO: normalize out commissions details? - details = { - 'contract': asdict(fill.contract), - 'execution': asdict(fill.execution), - 'commissions': asdict(fill.commissionReport), - 'broker_time': execu.time, # supposedly server fill time - 'name': 'ib', - } - - msg = BrokerdFill( - # should match the value returned from `.submit_limit()` - reqid=execu.orderId, - time_ns=time.time_ns(), # cuz why not - - action=action_map[execu.side], - size=execu.shares, - price=execu.price, - - broker_details=details, - # XXX: required by order mode currently - broker_time=details['broker_time'], - - ) - - elif event_name == 'error': - - err: dict = item - - # f$#$% gawd dammit insync.. - con = err['contract'] - if isinstance(con, Contract): - err['contract'] = asdict(con) - - if err['reqid'] == -1: - log.error(f'TWS external order error:\n{pformat(err)}') - - # TODO: what schema for this msg if we're going to make it - # portable across all backends? - # msg = BrokerdError(**err) - continue - - elif event_name == 'position': - msg = pack_position(item) - msg.account = accounts_def.inverse[msg.account] - - elif event_name == 'event': - - # it's either a general system status event or an external - # trade event? - log.info(f"TWS system status: \n{pformat(item)}") - - # TODO: support this again but needs parsing at the callback - # level... - # reqid = item.get('reqid', 0) - # if getattr(msg, 'reqid', 0) < -1: - # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") - - continue - - # msg.reqid = 'tws-' + str(-1 * reqid) - - # mark msg as from "external system" - # TODO: probably something better then this.. and start - # considering multiplayer/group trades tracking - # msg.broker_details['external_src'] = 'tws' - - # XXX: we always serialize to a dict for msgpack - # translations, ideally we can move to an msgspec (or other) - # encoder # that can be enabled in ``tractor`` ahead of - # time so we can pass through the message types directly. - await ems_stream.send(msg.dict()) - - -def load_flex_trades( - path: Optional[str] = None, - -) -> dict[str, str]: - - from pprint import pprint - from ib_insync import flexreport, util - - conf = get_config() - - if not path: - # load ``brokers.toml`` and try to get the flex - # token and query id that must be previously defined - # by the user. - token = conf.get('flex_token') - if not token: - raise ValueError( - 'You must specify a ``flex_token`` field in your' - '`brokers.toml` in order load your trade log, see our' - 'intructions for how to set this up here:\n' - 'PUT LINK HERE!' - ) - - qid = conf['flex_trades_query_id'] - - # TODO: hack this into our logging - # system like we do with the API client.. - util.logToConsole() - - # TODO: rewrite the query part of this with async..httpx? - report = flexreport.FlexReport( - token=token, - queryId=qid, - ) - - else: - # XXX: another project we could potentially look at, - # https://pypi.org/project/ibflex/ - report = flexreport.FlexReport(path=path) - - trade_entries = report.extract('Trade') - trades = { - # XXX: LOL apparently ``toml`` has a bug - # where a section key error will show up in the write - # if you leave this as an ``int``? - str(t.__dict__['tradeID']): t.__dict__ - for t in trade_entries - } - - ln = len(trades) - log.info(f'Loaded {ln} trades from flex query') - - trades_by_account = {} - for tid, trade in trades.items(): - trades_by_account.setdefault( - # oddly for some so-called "BookTrade" entries - # this field seems to be blank, no cuckin clue. - # trade['ibExecID'] - str(trade['accountId']), {} - )[tid] = trade - - section = {'ib': trades_by_account} - pprint(section) - - # TODO: load the config first and append in - # the new trades loaded here.. - try: - config.write(section, 'trades') - except KeyError: - import pdbpp; pdbpp.set_trace() # noqa - - -if __name__ == '__main__': - load_flex_trades() From bf7397f031b6cee4f36a353c76a999a17724342d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Jun 2022 15:56:12 -0400 Subject: [PATCH 4/6] Rename `.client` -> `.api` --- piker/brokers/ib/__init__.py | 4 ++-- piker/brokers/ib/{client.py => api.py} | 0 piker/brokers/ib/broker.py | 2 +- piker/brokers/ib/feed.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename piker/brokers/ib/{client.py => api.py} (100%) diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 2d9c198e..3f6504a1 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -30,7 +30,7 @@ Sub-modules within break into the core functionalities: tracking.. """ -from .client import ( +from .api import ( get_client, ) from .feed import ( @@ -51,7 +51,7 @@ __all__ = [ # tractor RPC enable arg __enable_modules__: list[str] = [ - 'client', + 'api', 'feed', 'broker', ] diff --git a/piker/brokers/ib/client.py b/piker/brokers/ib/api.py similarity index 100% rename from piker/brokers/ib/client.py rename to piker/brokers/ib/api.py diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 6bd35b37..50532bfd 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -56,7 +56,7 @@ from piker.clearing._messages import ( BrokerdFill, BrokerdError, ) -from .client import ( +from .api import ( _adhoc_futes_set, log, get_config, diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index d533af60..1b2bfb45 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -40,7 +40,7 @@ from trio_typing import TaskStatus from piker.data._sharedmem import ShmArray from .._util import SymbolNotFound, NoData -from .client import ( +from .api import ( _adhoc_futes_set, log, load_aio_clients, From 569674517fd1c17429ce51eaf8c0405cff65d6ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Jun 2022 16:28:08 -0400 Subject: [PATCH 5/6] Hack client check for `ib` using flag --- piker/brokers/ib/api.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 5405ab27..d2fb00dc 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1248,6 +1248,7 @@ async def open_client_proxy( @acm async def get_client( + is_brokercheck: bool = False, **kwargs, ) -> Client: @@ -1256,6 +1257,11 @@ async def get_client( a method proxy to it. ''' + # hack for `piker brokercheck ib`.. + if is_brokercheck: + yield Client + return + from .feed import open_data_client # TODO: the IPC via portal relay layer for when this current From 1345b250bc084436ca2aa8cbfeb768ac3b039e94 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Jun 2022 09:48:51 -0400 Subject: [PATCH 6/6] Import missing `_accounts2clients` table --- piker/brokers/ib/broker.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 50532bfd..721b6da8 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -57,6 +57,7 @@ from piker.clearing._messages import ( BrokerdError, ) from .api import ( + _accounts2clients, _adhoc_futes_set, log, get_config, @@ -112,8 +113,6 @@ async def handle_order_requests( ) -> None: - global _accounts2clients - request_msg: dict async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') @@ -271,7 +270,6 @@ async def trades_dialogue( accounts_def = config.load_accounts(['ib']) - global _accounts2clients global _client_cache # deliver positions to subscriber before anything else @@ -283,7 +281,6 @@ async def trades_dialogue( trio.open_nursery() as nurse, open_client_proxies() as (proxies, aioclients), ): - # for account, client in _accounts2clients.items(): for account, proxy in proxies.items(): client = aioclients[account]