From bdc02010cfa3545f59a4457c7a345ba4bee8e055 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Jan 2021 19:40:09 -0500 Subject: [PATCH] Finally, sanely normalize local trades event data --- piker/brokers/ib.py | 143 +++++++++++++++++++++++++------------------- 1 file changed, 82 insertions(+), 61 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 4b9c6579..a20ea7ef 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -32,6 +32,8 @@ import inspect import itertools import time +import trio +import tractor from async_generator import aclosing from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails @@ -40,15 +42,12 @@ from ib_insync.ticker import Ticker import ib_insync as ibis from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client -import trio -import tractor from ..log import get_logger, get_console_log from ..data import ( maybe_spawn_brokerd, iterticks, attach_shm_array, - # get_shm_token, subscribe_ohlc_for_increment, _buffer, ) @@ -217,7 +216,6 @@ class Client: # barSizeSetting='1 min', - # always use extended hours useRTH=False, @@ -306,6 +304,10 @@ class Client: # into state clobbering (eg. List: Ticker.ticks). It probably # makes sense to try this once we get the pub-sub working on # individual symbols... + + # XXX UPDATE: we can probably do the tick/trades scraping + # inside our eventkit handler instead to bypass this entirely? + # try: # # give the cache a go # return self._contracts[symbol] @@ -386,7 +388,6 @@ class Client: to_trio, opts: Tuple[int] = ('375', '233',), contract: Optional[Contract] = None, - # opts: Tuple[int] = ('459',), ) -> None: """Stream a ticker using the std L1 api. """ @@ -435,11 +436,11 @@ class Client: # async to be consistent for the client proxy, and cuz why not. async def submit_limit( self, - oid: str, + oid: str, # XXX: see return value symbol: str, price: float, - action: str = 'BUY', - quantity: int = 100, + action: str, + size: int = 100, ) -> int: """Place an order and return integer request id provided by client. @@ -452,16 +453,14 @@ class Client: # against non-known prices. raise RuntimeError("Can not order {symbol}, no live feed?") - # contract.exchange = 'SMART' - trade = self.ib.placeOrder( contract, Order( - # orderId=oid, + # orderId=oid, # stupid api devs.. action=action.upper(), # BUY/SELL orderType='LMT', lmtPrice=price, - totalQuantity=quantity, + totalQuantity=size, outsideRth=True, optOutSmartRouting=True, @@ -469,18 +468,21 @@ class Client: designatedLocation='SMART', ), ) + + # ib doesn't support setting your own id outside + # their own weird client int counting ids.. return trade.order.orderId async def submit_cancel( self, - oid: str, + reqid: str, ) -> None: """Send cancel request for order id ``oid``. """ self.ib.cancelOrder( Order( - orderId=oid, + orderId=reqid, clientId=self.ib.client.clientId, ) ) @@ -491,43 +493,37 @@ class Client: ) -> None: """Stream a ticker using the std L1 api. """ - # contract = contract or (await self.find_contract(symbol)) self.inline_errors(to_trio) def push_tradesies(eventkit_obj, trade, fill=None): """Push events to trio task. """ - # if fill is not None: - # heyoo we executed, and thanks to ib_insync - # we have to handle the callback signature differently - # due to its consistently non-consistent design. + if fill is not None: + # execution details event + item = ('fill', (trade, fill)) + else: + item = ('status', trade) - # yet again convert the datetime since they aren't - # ipc serializable... - # fill.time = fill.time.timestamp - # trade.fill = fill + log.info(f'{eventkit_obj}: {item}') - print(f'{eventkit_obj}: {trade}') - log.debug(trade) - if trade is None: - print("YO WTF NONE") try: - to_trio.send_nowait(trade) + to_trio.send_nowait(item) except trio.BrokenResourceError: - # XXX: eventkit's ``Event.emit()`` for whatever redic - # reason will catch and ignore regular exceptions - # resulting in tracebacks spammed to console.. - # Manually do the dereg ourselves. log.exception(f'Disconnected from {eventkit_obj} updates') eventkit_obj.disconnect(push_tradesies) # hook up to the weird eventkit object - event stream api for ev_name in [ - 'orderStatusEvent', - 'execDetailsEvent', - # XXX: not sure yet if we need these + 'orderStatusEvent', # all order updates + 'execDetailsEvent', # all "fill" updates + # 'commissionReportEvent', + # XXX: ugh, it is a separate event from IB and it's + # emitted as follows: + # self.ib.commissionReportEvent.emit(trade, fill, report) + + # XXX: not sure yet if we need these # 'updatePortfolioEvent', # 'positionEvent', @@ -559,13 +555,13 @@ class Client: ) -> None: log.error(errorString) try: - to_trio.send_nowait( - {'error': { - 'brid': reqId, - 'message': errorString, - 'contract': contract, - }} - ) + to_trio.send_nowait(( + 'error', + # error "object" + {'reqid': reqId, + 'message': errorString, + 'contract': contract} + )) except trio.BrokenResourceError: # XXX: eventkit's ``Event.emit()`` for whatever redic # reason will catch and ignore regular exceptions @@ -838,8 +834,8 @@ async def fill_bars( method='bars', symbol=sym, end_dt=next_dt, - ) + shm.push(bars_array, prepend=True) i += 1 next_dt = bars[0].date @@ -1092,25 +1088,50 @@ async def stream_trades( method='recv_trade_updates', ) - # init startup msg - yield {'trade_events': 'started'} + # startup msg + yield {'local_trades': 'start'} - async for event in stream: - from pprint import pprint + async for event_name, item in stream: - if not isinstance(event, dict): - # remove trade log entries for now until we figure out if we - # even want to retreive them this way and because they're using - # datetimes - event = asdict(event) - pprint(event) - event.pop('log', None) + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... - # fills = event.get('fills') - # if fills: - # await tractor.breakpoint() - # for fill in fills: - # fill['time'] = fill['time'].timestamp - # exec = fill.pop('execution') + if event_name == 'status': - yield {'trade_events': event} + # unwrap needed data from ib_insync internal objects + trade = item + status = trade.orderStatus + + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = { + 'reqid': trade.order.orderId, + 'status': status.status, + 'filled': status.filled, + 'reason': status.whyHeld, + + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + 'remaining': status.remaining, + } + + elif event_name == 'fill': + trade, fill = item + execu = fill.execution + msg = { + 'reqid': execu.orderId, + 'execid': execu.execId, + + # supposedly IB server fill time + 'broker_time': execu.time, # converted to float by us + 'time': fill.time, # ns in main TCP handler by us + 'time_ns': time.time_ns(), # cuz why not + 'action': {'BOT': 'buy', 'SLD': 'sell'}[execu.side], + 'size': execu.shares, + 'price': execu.price, + } + + elif event_name == 'error': + msg = item + + yield {'local_trades': (event_name, msg)}