diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index f1070a43..e3403397 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -20,6 +20,7 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations from contextlib import ExitStack +from collections import ChainMap from functools import partial from pprint import pformat import time @@ -135,18 +136,19 @@ async def handle_order_requests( action: str = request_msg['action'] account: str = request_msg['account'] acct_number = accounts_def.get(account) + oid: str = request_msg['oid'] + if not acct_number: log.error( f'An IB account number for name {account} is not found?\n' 'Make sure you have all TWS and GW instances running.' ) - await ems_order_stream.send( - BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=f'No account found: `{account}` ?', - ) + err_msg = BrokerdError( + oid=oid, + symbol=request_msg['symbol'], + reason=f'No account found: `{account}` ?', ) + await ems_order_stream.send(err_msg) continue client = _accounts2clients.get(account) @@ -155,11 +157,12 @@ async def handle_order_requests( f'An IB client for account name {account} is not found.\n' 'Make sure you have all TWS and GW instances running.' ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], + err_msg = BrokerdError( + oid=oid, symbol=request_msg['symbol'], reason=f'No api client loaded for account: `{account}` ?', - )) + ) + await ems_order_stream.send(err_msg) continue if action in {'buy', 'sell'}: @@ -185,23 +188,26 @@ async def handle_order_requests( account=acct_number, reqid=reqid, ) + str_reqid: str = str(reqid) if reqid is None: - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], + err_msg = BrokerdError( + oid=oid, symbol=request_msg['symbol'], reason='Order already active?', - )) + ) + await ems_order_stream.send(err_msg) # deliver ack that order has been submitted to broker routing ack = BrokerdOrderAck( # ems order request id oid=order.oid, # broker specific request id - reqid=reqid, + reqid=str_reqid, account=account, ) await ems_order_stream.send(ack) - flows.add_msg(reqid, ack.to_dict()) + flows.add_msg(str_reqid, order.to_dict()) + flows.add_msg(str_reqid, ack.to_dict()) elif action == 'cancel': msg = BrokerdCancel(**request_msg) @@ -441,7 +447,7 @@ async def aggr_open_orders( deats = await proxy.con_deats(contracts=[con]) fqme = list(deats)[0] - reqid = order.orderId + reqid: str = str(order.orderId) # TODO: maybe embed a ``BrokerdOrder`` instead # since then we can directly load it on the client @@ -449,7 +455,7 @@ async def aggr_open_orders( msg = Status( time_ns=time.time_ns(), resp='open', - oid=str(reqid), + oid=reqid, reqid=reqid, # embedded order info @@ -1213,6 +1219,8 @@ async def deliver_trade_events( # `Client.inline_errors()::push_err()` err: dict = item + # never relay errors for non-broker related issues + # https://interactivebrokers.github.io/tws-api/message_codes.html code: int = err['error_code'] if code in { 200, # uhh @@ -1221,43 +1229,42 @@ async def deliver_trade_events( 162, 165, + # WARNING codes: + # https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes + # Attribute 'Outside Regular Trading Hours' is + # " 'ignored based on the order type and + # destination. PlaceOrder is now ' 'being + # processed.', + 2109, + + # XXX: lol this isn't even documented.. # 'No market data during competing live session' 1669, }: continue - reqid: str = err['reqid'] - acnt: str = flows.get(reqid)['account'] + reqid: str = str(err['reqid']) reason: str = err['reason'] if err['reqid'] == -1: log.error(f'TWS external order error:\n{pformat(err)}') + flow: ChainMap = flows.get(reqid) + # TODO: we don't want to relay data feed / lookup errors # so we need some further filtering logic here.. # for most cases the 'status' block above should take # care of this. - await ems_stream.send( - BrokerdStatus( - status='error', - reqid=reqid, - reason=reason, - time_ns=time.time_ns(), - account=acnt, - broker_details={'name': 'ib'}, - ) - ) - - canceled = BrokerdStatus( + err_msg = BrokerdError( reqid=reqid, - time_ns=time.time_ns(), # cuz why not - status='canceled', reason=reason, - account=acnt, - broker_details={'name': 'ib'}, + broker_details={ + 'name': 'ib', + 'flow': dict(flow), + }, ) - await ems_stream.send(canceled) - flows.add_msg(reqid, canceled.to_dict()) + flows.add_msg(reqid, err_msg.to_dict()) + await ems_stream.send(err_msg) case 'event': diff --git a/piker/brokers/ib/symbols.py b/piker/brokers/ib/symbols.py index c6492414..31cf74a0 100644 --- a/piker/brokers/ib/symbols.py +++ b/piker/brokers/ib/symbols.py @@ -132,6 +132,12 @@ _adhoc_fiat_set = set(( ).split(' ,') ) +# manually discovered tick discrepancies, +# onl god knows how or why they'd cuck these up.. +_adhoc_mkt_infos: dict[int | str, dict] = { + 'vtgn.nasdaq': {'price_tick': Decimal('0.01')}, +} + # map of symbols to contract ids _adhoc_symbol_map = { @@ -511,6 +517,7 @@ async def get_mkt_info( venue = con.primaryExchange or con.exchange price_tick: Decimal = Decimal(str(details.minTick)) + # price_tick: Decimal = Decimal('0.01') if atype == 'stock': # XXX: GRRRR they don't support fractional share sizes for @@ -541,14 +548,15 @@ async def get_mkt_info( atype='fiat', tx_tick=Decimal('0.01'), # right? ) + dst = Asset( + name=con.symbol.lower(), + atype=atype, + tx_tick=size_tick, + ) mkt = MktPair( - dst=Asset( - name=con.symbol.lower(), - atype=atype, - tx_tick=size_tick, - ), src=src, + dst=dst, price_tick=price_tick, size_tick=size_tick, @@ -563,6 +571,15 @@ async def get_mkt_info( _fqme_without_src=(atype != 'fiat'), ) + # just.. wow. + if entry := _adhoc_mkt_infos.get(mkt.bs_fqme): + log.warning(f'Frickin {mkt.fqme} has an adhoc {entry}..') + new = mkt.to_dict() + new['price_tick'] = entry['price_tick'] + new['src'] = src + new['dst'] = dst + mkt = MktPair(**new) + # if possible register the bs_mktid to the just-built # mkt so that it can be retreived by order mode tasks later. # TODO NOTE: this is going to be problematic if/when we split