diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 1d4ffc41..79dfc5f1 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -100,8 +100,9 @@ class NonShittyWrapper(Wrapper): def tcpDataArrived(self): """Override time stamps to be floats for now. """ - # use a float to store epoch time instead of datetime - self.lastTime = time.time() + # use a ns int to store epoch time instead of datetime + self.lastTime = time.time_ns() + for ticker in self.pendingTickers: ticker.rtTime = None ticker.ticks = [] @@ -109,6 +110,18 @@ class NonShittyWrapper(Wrapper): ticker.domTicks = [] self.pendingTickers = set() + def execDetails( + self, + reqId: int, + contract: Contract, + execu, + ): + """ + Get rid of datetime on executions. + """ + execu.time = execu.time.timestamp() + return super().execDetails(reqId, contract, execu) + class NonShittyIB(ibis.IB): """The beginning of overriding quite a few decisions in this lib. @@ -121,7 +134,7 @@ class NonShittyIB(ibis.IB): # XXX: just to override this wrapper self.wrapper = NonShittyWrapper(self) self.client = ib_Client(self.wrapper) - self.errorEvent += self._onError + # self.errorEvent += self._onError self.client.apiEnd += self.disconnectedEvent self._logger = logging.getLogger('ib_insync.ib') @@ -220,9 +233,6 @@ class Client: df = ibis.util.df(bars) return bars, from_df(df) - def onError(self, reqId, errorCode, errorString, contract) -> None: - breakpoint() - async def search_stocks( self, pattern: str, @@ -348,9 +358,6 @@ class Client: exch = 'SMART' if not exch else exch contract = (await self.ib.qualifyContractsAsync(con))[0] - # head = await self.get_head_time(contract) - # print(head) - except IndexError: raise ValueError(f"No contract could be found {con}") @@ -423,9 +430,11 @@ class Client: ticker = await ticker.updateEvent return contract, ticker - def submit_limit( + # async to be consistent for the client proxy, and cuz why not. + async def submit_limit( self, - contract: Contract, + oid: str, + symbol: str, price: float, action: str = 'BUY', quantity: int = 100, @@ -433,29 +442,70 @@ class Client: """Place an order and return integer request id provided by client. """ + try: + contract = self._contracts[symbol] + except KeyError: + # require that the symbol has been previously cached by + # a data feed request - ensure we aren't making orders + # against non-known prices. + raise RuntimeError("Can not order {symbol}, no live feed?") + + # contract.exchange = 'SMART' + trade = self.ib.placeOrder( + contract, Order( - self, + # orderId=oid, + action=action.upper(), # BUY/SELL orderType='LMT', - action=action, - totalQuantity=quantity, lmtPrice=price, - ) + totalQuantity=quantity, + outsideRth=True, + + optOutSmartRouting=True, + routeMarketableToBbo=True, + designatedLocation='SMART', + ), ) return trade.order.orderId + async def submit_cancel( + self, + oid: str, + ) -> None: + """Send cancel request for order id ``oid``. + + """ + self.ib.cancelOrder( + Order( + orderId=oid, + clientId=self.ib.client.clientId, + ) + ) + async def recv_trade_updates( self, - to_trio, + to_trio: trio.abc.SendChannel, ) -> None: """Stream a ticker using the std L1 api. """ # contract = contract or (await self.find_contract(symbol)) + self.inline_errors(to_trio) - def push(eventkit_obj, trade): + def push_tradesies(eventkit_obj, trade, fill=None): """Push events to trio task. """ + # if fill is not None: + # heyoo we executed, and thanks to ib_insync + # we have to handle the callback signature differently + # due to its consistently non-consistent design. + + # yet again convert the datetime since they aren't + # ipc serializable... + # fill.time = fill.time.timestamp + # trade.fill = fill + print(f'{eventkit_obj}: {trade}') log.debug(trade) if trade is None: @@ -468,21 +518,62 @@ class Client: # resulting in tracebacks spammed to console.. # Manually do the dereg ourselves. log.exception(f'Disconnected from {eventkit_obj} updates') - eventkit_obj.disconnect(push) + eventkit_obj.disconnect(push_tradesies) # hook up to the weird eventkit object - event stream api for ev_name in [ - # 'newOrderEvent', 'orderModifyEvent', 'cancelOrderEvent', - 'openOrderEvent', 'orderStatusEvent', 'execDetailsEvent', - # 'commissionReportEvent', 'updatePortfolioEvent', 'positionEvent', + 'orderStatusEvent', + 'execDetailsEvent', + # XXX: not sure yet if we need these + # 'commissionReportEvent', + # 'updatePortfolioEvent', + # 'positionEvent', + + # XXX: these all seem to be weird ib_insync intrernal + # events that we probably don't care that much about + # given the internal design is wonky af.. + # 'newOrderEvent', + # 'orderModifyEvent', + # 'cancelOrderEvent', + # 'openOrderEvent', ]: eventkit_obj = getattr(self.ib, ev_name) - handler = partial(push, eventkit_obj) + handler = partial(push_tradesies, eventkit_obj) eventkit_obj.connect(handler) # let the engine run and stream await self.ib.disconnectedEvent + def inline_errors( + self, + to_trio: trio.abc.SendChannel, + ) -> None: + # connect error msgs + def push_err( + reqId: int, + errorCode: int, + errorString: str, + contract: Contract, + ) -> None: + log.error(errorString) + try: + to_trio.send_nowait( + {'error': { + 'brid': reqId, + 'message': errorString, + 'contract': contract, + }} + ) + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + log.exception('Disconnected from errorEvent updates') + self.ib.errorEvent.disconnect(push_err) + + self.ib.errorEvent.connect(push_err) + # default config ports _tws_port: int = 7497 @@ -536,11 +627,13 @@ async def _aio_get_client( else: raise ConnectionRefusedError(_err) + # create and cache try: client = Client(ib) _client_cache[(host, port)] = client log.debug(f"Caching client for {(host, port)}") yield client + except BaseException: ib.disconnect() raise @@ -607,8 +700,7 @@ class _MethodProxy: **kwargs ) -> Any: return await self._portal.run( - __name__, - '_trio_run_client_method', + _trio_run_client_method, method=meth, **kwargs ) @@ -641,7 +733,8 @@ async def get_client( infect_asyncio=True, **kwargs ) as portal: - yield get_client_proxy(portal) + proxy_client = get_client_proxy(portal) + yield proxy_client # https://interactivebrokers.github.io/tws-api/tick_types.html @@ -997,11 +1090,25 @@ async def stream_trades( method='recv_trade_updates', ) - # more great work by our friend ib_insync... - # brutallll bby. - none = await stream.__anext__() - print(f'Cuz sending {none} makes sense..') + # init startup msg + yield {'trade_events': 'started'} - async for trade_event in stream: - msg = asdict(trade_event) - yield {'all': msg} + async for event in stream: + from pprint import pprint + + if not isinstance(event, dict): + # remove trade log entries for now until we figure out if we + # even want to retreive them this way and because they're using + # datetimes + event = asdict(event) + pprint(event) + event.pop('log', None) + + # fills = event.get('fills') + # if fills: + # await tractor.breakpoint() + # for fill in fills: + # fill['time'] = fill['time'].timestamp + # exec = fill.pop('execution') + + yield {'trade_events': event}