From 280739b51a6c180cd7fa8f184d732fb3db4c081d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 9 Jan 2021 10:54:09 -0500 Subject: [PATCH] Add trades data streaming support --- piker/brokers/ib.py | 156 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 132 insertions(+), 24 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index d0645dfe..6d310c0d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -35,6 +35,7 @@ import time from async_generator import aclosing from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails +from ib_insync.order import Order from ib_insync.ticker import Ticker import ib_insync as ibis from ib_insync.wrapper import Wrapper @@ -53,7 +54,6 @@ from ..data import ( ) from ..data._source import from_df from ._util import SymbolNotFound -from .._async_utils import maybe_with_if log = get_logger(__name__) @@ -150,6 +150,9 @@ class Client: self.ib = ib self.ib.RaiseRequestErrors = True + # contract cache + self._contracts: Dict[str, Contract] = {} + # NOTE: the ib.client here is "throttled" to 45 rps by default async def bars( @@ -283,6 +286,20 @@ class Client: currency: str = 'USD', **kwargs, ) -> Contract: + + # TODO: we can't use this currently because + # ``wrapper.starTicker()`` currently cashes ticker instances + # which means getting a singel quote will potentially look up + # a quote for a ticker that it already streaming and thus run + # into state clobbering (eg. List: Ticker.ticks). It probably + # makes sense to try this once we get the pub-sub working on + # individual symbols... + # try: + # # give the cache a go + # return self._contracts[symbol] + # except KeyError: + # log.debug(f'Looking up contract for {symbol}') + # use heuristics to figure out contract "type" try: sym, exch = symbol.upper().rsplit('.', maxsplit=1) @@ -336,6 +353,8 @@ class Client: except IndexError: raise ValueError(f"No contract could be found {con}") + + self._contracts[symbol] = contract return contract async def get_head_time( @@ -401,7 +420,65 @@ class Client: contract, snapshot=True, ) - return contract, (await ticker.updateEvent) + ticker = await ticker.updateEvent + return contract, ticker + + async def submit_limit( + self, + contract: Contract, + price: float, + action: str = 'BUY', + quantity: int = 100, + ) -> None: + self.ib.placeOrder( + Order( + self, + orderType='LMT', + action=action, + totalQuantity=quantity, + lmtPrice=price, + # **kwargs + ) + ) + + async def recv_trade_updates( + self, + to_trio, + ) -> None: + """Stream a ticker using the std L1 api. + """ + # contract = contract or (await self.find_contract(symbol)) + + def push(eventkit_obj, trade): + """Push events to trio task. + + """ + print(f'{eventkit_obj}: {trade}') + log.debug(trade) + if trade is None: + print("YO WTF NONE") + try: + to_trio.send_nowait(trade) + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + log.exception(f'Disconnected from {eventkit_obj} updates') + eventkit_obj.disconnect(push) + + # hook up to the weird eventkit object - event stream api + for ev_name in [ + # 'newOrderEvent', 'orderModifyEvent', 'cancelOrderEvent', + 'openOrderEvent', 'orderStatusEvent', 'execDetailsEvent', + # 'commissionReportEvent', 'updatePortfolioEvent', 'positionEvent', + ]: + eventkit_obj = getattr(self.ib, ev_name) + handler = partial(push, eventkit_obj) + eventkit_obj.connect(handler) + + # let the engine run and stream + await self.ib.disconnectedEvent # default config ports @@ -586,10 +663,11 @@ def normalize( # convert named tuples to dicts so we send usable keys new_ticks = [] for tick in ticker.ticks: - td = tick._asdict() - td['type'] = tick_types.get(td['tickType'], 'n/a') + if tick: + td = tick._asdict() + td['type'] = tick_types.get(td['tickType'], 'n/a') - new_ticks.append(td) + new_ticks.append(td) ticker.ticks = new_ticks @@ -788,7 +866,7 @@ async def stream_quotes( # no real volume so we have to calculate the price suffix = 'secType' calc_price = True - ticker = first_ticker + # ticker = first_ticker # pass first quote asap quote = normalize(first_ticker, calc_price=calc_price) @@ -814,27 +892,32 @@ async def stream_quotes( calc_price = False # should be real volume for contract - # wait for real volume on feed (trading might be closed) - async for ticker in stream: + with trio.move_on_after(10) as cs: + # wait for real volume on feed (trading might be closed) + async for ticker in stream: - # for a real volume contract we rait for the first - # "real" trade to take place - if not calc_price and not ticker.rtTime: - # spin consuming tickers until we get a real market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] + # for a real volume contract we rait for the first + # "real" trade to take place + if not calc_price and not ticker.rtTime: + # spin consuming tickers until we get a real market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) + ticker.ticks = [] - # tell incrementer task it can start - _buffer.shm_incrementing(key).set() + # tell incrementer task it can start + _buffer.shm_incrementing(key).set() + + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + if cs.cancelled_caught: + await tractor.breakpoint() - # XXX: this works because we don't use - # ``aclosing()`` above? - break # real-time stream async for ticker in stream: @@ -895,3 +978,28 @@ async def stream_quotes( # ugh, clear ticks since we've consumed them ticker.ticks = [] + + +@tractor.msg.pub +async def stream_trades( + loglevel: str = None, + get_topics: Callable = None, +) -> AsyncIterator[Dict[str, Any]]: + + log.error('startedddd daa tradeeeez feeeedddzzz') + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + stream = await _trio_run_client_method( + method='recv_trade_updates', + ) + + # more great work by our friend ib_insync... + # brutallll bby. + none = await stream.__anext__() + print(f'Cuz sending {none} makes sense..') + + async for trade_event in stream: + msg = asdict(trade_event) + yield {'all': msg}