ib: relay submission errors, allow adhoc mkt overrides

This is a tricky edge case we weren't handling prior; an example is
submitting a limit order with a price tick precision which mismatches
that supported (probably bc IB reported the wrong one..) and IB responds
immediately with an error event (via a special code..) but doesn't
include any `Trade` object(s) nor details beyond the `reqid`. So, we
have to do a little reverse EMS order lookup on our own and ideally
indicate to the requester which order failed and *why*.

To enable this we,
- create a `flows: OrderDialogs` instance and pass it to most order/event relay
  tasks, particularly ensuring we update update ASAP in `handle_order_requests()`
  such that any successful submit has an `Ack` recorded in the flow.
- on such errors lookup the `.symbol` / `Order` from the `flow` and
  respond back to the EMS with as many details as possible about the
  prior msg history.
- always explicitly relay `error` events which don't fall into the
  sensible filtered set and wrap in
  a `BrokerdError.broker_details['flow']: dict` snapshot for the EMS.
- in `symbols.get_mkt_info()` support adhoc lookup for `MktPair` inputs
  and when defined we re-construct with those inputs; in this case we do
  this for a first mkt: `'vtgn.nasdaq'`..
account_tests
Tyler Goodlet 2023-08-10 10:31:00 -04:00
parent 562d027ee6
commit f66a1f8b23
2 changed files with 65 additions and 41 deletions

View File

@ -20,6 +20,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
""" """
from __future__ import annotations from __future__ import annotations
from contextlib import ExitStack from contextlib import ExitStack
from collections import ChainMap
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
import time import time
@ -135,18 +136,19 @@ async def handle_order_requests(
action: str = request_msg['action'] action: str = request_msg['action']
account: str = request_msg['account'] account: str = request_msg['account']
acct_number = accounts_def.get(account) acct_number = accounts_def.get(account)
oid: str = request_msg['oid']
if not acct_number: if not acct_number:
log.error( log.error(
f'An IB account number for name {account} is not found?\n' f'An IB account number for name {account} is not found?\n'
'Make sure you have all TWS and GW instances running.' 'Make sure you have all TWS and GW instances running.'
) )
await ems_order_stream.send( err_msg = BrokerdError(
BrokerdError( oid=oid,
oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
) )
) await ems_order_stream.send(err_msg)
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -155,11 +157,12 @@ async def handle_order_requests(
f'An IB client for account name {account} is not found.\n' f'An IB client for account name {account} is not found.\n'
'Make sure you have all TWS and GW instances running.' 'Make sure you have all TWS and GW instances running.'
) )
await ems_order_stream.send(BrokerdError( err_msg = BrokerdError(
oid=request_msg['oid'], oid=oid,
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?', reason=f'No api client loaded for account: `{account}` ?',
)) )
await ems_order_stream.send(err_msg)
continue continue
if action in {'buy', 'sell'}: if action in {'buy', 'sell'}:
@ -185,23 +188,26 @@ async def handle_order_requests(
account=acct_number, account=acct_number,
reqid=reqid, reqid=reqid,
) )
str_reqid: str = str(reqid)
if reqid is None: if reqid is None:
await ems_order_stream.send(BrokerdError( err_msg = BrokerdError(
oid=request_msg['oid'], oid=oid,
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason='Order already active?', reason='Order already active?',
)) )
await ems_order_stream.send(err_msg)
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
ack = BrokerdOrderAck( ack = BrokerdOrderAck(
# ems order request id # ems order request id
oid=order.oid, oid=order.oid,
# broker specific request id # broker specific request id
reqid=reqid, reqid=str_reqid,
account=account, account=account,
) )
await ems_order_stream.send(ack) await ems_order_stream.send(ack)
flows.add_msg(reqid, ack.to_dict()) flows.add_msg(str_reqid, order.to_dict())
flows.add_msg(str_reqid, ack.to_dict())
elif action == 'cancel': elif action == 'cancel':
msg = BrokerdCancel(**request_msg) msg = BrokerdCancel(**request_msg)
@ -441,7 +447,7 @@ async def aggr_open_orders(
deats = await proxy.con_deats(contracts=[con]) deats = await proxy.con_deats(contracts=[con])
fqme = list(deats)[0] fqme = list(deats)[0]
reqid = order.orderId reqid: str = str(order.orderId)
# TODO: maybe embed a ``BrokerdOrder`` instead # TODO: maybe embed a ``BrokerdOrder`` instead
# since then we can directly load it on the client # since then we can directly load it on the client
@ -449,7 +455,7 @@ async def aggr_open_orders(
msg = Status( msg = Status(
time_ns=time.time_ns(), time_ns=time.time_ns(),
resp='open', resp='open',
oid=str(reqid), oid=reqid,
reqid=reqid, reqid=reqid,
# embedded order info # embedded order info
@ -1213,6 +1219,8 @@ async def deliver_trade_events(
# `Client.inline_errors()::push_err()` # `Client.inline_errors()::push_err()`
err: dict = item err: dict = item
# never relay errors for non-broker related issues
# https://interactivebrokers.github.io/tws-api/message_codes.html
code: int = err['error_code'] code: int = err['error_code']
if code in { if code in {
200, # uhh 200, # uhh
@ -1221,43 +1229,42 @@ async def deliver_trade_events(
162, 162,
165, 165,
# WARNING codes:
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# Attribute 'Outside Regular Trading Hours' is
# " 'ignored based on the order type and
# destination. PlaceOrder is now ' 'being
# processed.',
2109,
# XXX: lol this isn't even documented..
# 'No market data during competing live session' # 'No market data during competing live session'
1669, 1669,
}: }:
continue continue
reqid: str = err['reqid'] reqid: str = str(err['reqid'])
acnt: str = flows.get(reqid)['account']
reason: str = err['reason'] reason: str = err['reason']
if err['reqid'] == -1: if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}') log.error(f'TWS external order error:\n{pformat(err)}')
flow: ChainMap = flows.get(reqid)
# TODO: we don't want to relay data feed / lookup errors # TODO: we don't want to relay data feed / lookup errors
# so we need some further filtering logic here.. # so we need some further filtering logic here..
# for most cases the 'status' block above should take # for most cases the 'status' block above should take
# care of this. # care of this.
await ems_stream.send( err_msg = BrokerdError(
BrokerdStatus(
status='error',
reqid=reqid, reqid=reqid,
reason=reason, reason=reason,
time_ns=time.time_ns(), broker_details={
account=acnt, 'name': 'ib',
broker_details={'name': 'ib'}, 'flow': dict(flow),
},
) )
) flows.add_msg(reqid, err_msg.to_dict())
await ems_stream.send(err_msg)
canceled = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(), # cuz why not
status='canceled',
reason=reason,
account=acnt,
broker_details={'name': 'ib'},
)
await ems_stream.send(canceled)
flows.add_msg(reqid, canceled.to_dict())
case 'event': case 'event':

View File

@ -132,6 +132,12 @@ _adhoc_fiat_set = set((
).split(' ,') ).split(' ,')
) )
# manually discovered tick discrepancies,
# onl god knows how or why they'd cuck these up..
_adhoc_mkt_infos: dict[int | str, dict] = {
'vtgn.nasdaq': {'price_tick': Decimal('0.01')},
}
# map of symbols to contract ids # map of symbols to contract ids
_adhoc_symbol_map = { _adhoc_symbol_map = {
@ -511,6 +517,7 @@ async def get_mkt_info(
venue = con.primaryExchange or con.exchange venue = con.primaryExchange or con.exchange
price_tick: Decimal = Decimal(str(details.minTick)) price_tick: Decimal = Decimal(str(details.minTick))
# price_tick: Decimal = Decimal('0.01')
if atype == 'stock': if atype == 'stock':
# XXX: GRRRR they don't support fractional share sizes for # XXX: GRRRR they don't support fractional share sizes for
@ -541,14 +548,15 @@ async def get_mkt_info(
atype='fiat', atype='fiat',
tx_tick=Decimal('0.01'), # right? tx_tick=Decimal('0.01'), # right?
) )
dst = Asset(
mkt = MktPair(
dst=Asset(
name=con.symbol.lower(), name=con.symbol.lower(),
atype=atype, atype=atype,
tx_tick=size_tick, tx_tick=size_tick,
), )
mkt = MktPair(
src=src, src=src,
dst=dst,
price_tick=price_tick, price_tick=price_tick,
size_tick=size_tick, size_tick=size_tick,
@ -563,6 +571,15 @@ async def get_mkt_info(
_fqme_without_src=(atype != 'fiat'), _fqme_without_src=(atype != 'fiat'),
) )
# just.. wow.
if entry := _adhoc_mkt_infos.get(mkt.bs_fqme):
log.warning(f'Frickin {mkt.fqme} has an adhoc {entry}..')
new = mkt.to_dict()
new['price_tick'] = entry['price_tick']
new['src'] = src
new['dst'] = dst
mkt = MktPair(**new)
# if possible register the bs_mktid to the just-built # if possible register the bs_mktid to the just-built
# mkt so that it can be retreived by order mode tasks later. # mkt so that it can be retreived by order mode tasks later.
# TODO NOTE: this is going to be problematic if/when we split # TODO NOTE: this is going to be problematic if/when we split