ib: handle order errors via `reqid` lookup

Finally this is a reason to use our new `OrderDialogs` abstraction; on
order submission errors IB doesn't really pass back anything other then
the `orderId` and the reason so we have to conduct our own lookup for
a message to relay to the EMS..

So, for every EMS msg we send, add it to the dialog tracker and then use
the `flows: OrderDialogs` for lookup in the case where we need to relay
said error. Also, include sending a `canceled` status such that the
order won't get stuck as a stale entry in the `emsd`'s own dialog table.
For now we just filter out errors that are unrelated from the stream
since there's always going to be stuff to do with live/history data
queries..
account_tests
Tyler Goodlet 2023-08-07 18:19:35 -04:00
parent 85a38d057b
commit ff2bbd5aca
2 changed files with 66 additions and 31 deletions

View File

@ -916,22 +916,20 @@ class Client:
) -> None: ) -> None:
reason = errorString reason: str = errorString
if reqId == -1: if reqId == -1:
# it's a general event? # it's a general event?
key = 'event' key: str = 'event'
log.info(errorString) log.info(errorString)
else: else:
key = 'error' key: str = 'error'
log.error(errorString) log.error(errorString)
try: try:
to_trio.send_nowait(( to_trio.send_nowait((
key, key,
# error "object"
{ {
'type': key, 'type': key,
'reqid': reqId, 'reqid': reqId,

View File

@ -20,7 +20,6 @@ Order and trades endpoints for use with ``piker``'s EMS.
""" """
from __future__ import annotations from __future__ import annotations
from contextlib import ExitStack from contextlib import ExitStack
from dataclasses import asdict
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
import time import time
@ -64,6 +63,7 @@ from piker.data import (
open_symcache, open_symcache,
SymbologyCache, SymbologyCache,
) )
from piker.clearing import OrderDialogs
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
Status, Status,
@ -124,6 +124,7 @@ async def handle_order_requests(
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
accounts_def: dict[str, str], accounts_def: dict[str, str],
flows: OrderDialogs,
) -> None: ) -> None:
@ -169,8 +170,9 @@ async def handle_order_requests(
# there is no existing order so ask the client to create # there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int # a new one (which it seems to do by allocating an int
# counter - collision prone..) # counter - collision prone..)
reqid = order.reqid reqid: int | None = order.reqid
if reqid is not None: if reqid is not None:
log.error(f'TYPE .reqid: {reqid} -> {type(reqid)}')
reqid = int(reqid) reqid = int(reqid)
# call our client api to submit the order # call our client api to submit the order
@ -191,15 +193,15 @@ async def handle_order_requests(
)) ))
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( ack = BrokerdOrderAck(
BrokerdOrderAck(
# ems order request id # ems order request id
oid=order.oid, oid=order.oid,
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
account=account, account=account,
) )
) await ems_order_stream.send(ack)
flows.add_msg(reqid, ack.to_dict())
elif action == 'cancel': elif action == 'cancel':
msg = BrokerdCancel(**request_msg) msg = BrokerdCancel(**request_msg)
@ -521,6 +523,8 @@ async def open_trade_dialog(
) -> AsyncIterator[dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
# task local msg dialog tracking
flows = OrderDialogs()
accounts_def = config.load_accounts(['ib']) accounts_def = config.load_accounts(['ib'])
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
@ -755,6 +759,7 @@ async def open_trade_dialog(
handle_order_requests, handle_order_requests,
ems_stream, ems_stream,
accounts_def, accounts_def,
flows,
) )
# allocate event relay tasks for each client connection # allocate event relay tasks for each client connection
@ -767,6 +772,7 @@ async def open_trade_dialog(
proxies, proxies,
ledgers, ledgers,
tables, tables,
flows,
) )
# write account and ledger files immediately! # write account and ledger files immediately!
@ -985,6 +991,8 @@ async def deliver_trade_events(
ledgers, ledgers,
tables, tables,
flows: OrderDialogs,
) -> None: ) -> None:
''' '''
Format and relay all trade events for a given client to emsd. Format and relay all trade events for a given client to emsd.
@ -1013,6 +1021,7 @@ async def deliver_trade_events(
# unwrap needed data from ib_insync internal types # unwrap needed data from ib_insync internal types
trade: Trade = item trade: Trade = item
reqid: str = str(trade.order.orderId)
status: OrderStatus = trade.orderStatus status: OrderStatus = trade.orderStatus
status_str: str = _statuses[status.status] status_str: str = _statuses[status.status]
remaining: float = status.remaining remaining: float = status.remaining
@ -1027,7 +1036,7 @@ async def deliver_trade_events(
# NOTE: should match the value returned from # NOTE: should match the value returned from
# `.submit_limit()` # `.submit_limit()`
reqid=execu.orderId, reqid=reqid,
action=_action_map[execu.side], action=_action_map[execu.side],
size=execu.shares, size=execu.shares,
@ -1040,7 +1049,9 @@ async def deliver_trade_events(
# XXX: required by order mode currently # XXX: required by order mode currently
broker_time=execu.time, broker_time=execu.time,
) )
await ems_stream.send(fill_msg) await ems_stream.send(fill_msg)
flows.add_msg(reqid, fill_msg.to_dict())
if remaining == 0: if remaining == 0:
# emit a closed status on filled statuses where # emit a closed status on filled statuses where
@ -1050,7 +1061,7 @@ async def deliver_trade_events(
# skip duplicate filled updates - we get the deats # skip duplicate filled updates - we get the deats
# from the execution details event # from the execution details event
msg = BrokerdStatus( msg = BrokerdStatus(
reqid=trade.order.orderId, reqid=reqid,
time_ns=time.time_ns(), # cuz why not time_ns=time.time_ns(), # cuz why not
account=accounts_def.inverse[trade.order.account], account=accounts_def.inverse[trade.order.account],
@ -1067,8 +1078,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg) await ems_stream.send(msg)
continue flows.add_msg(reqid, msg.to_dict())
# XXX: for wtv reason this is a separate event type # XXX: for wtv reason this is a separate event type
# from IB, not sure why it's needed other then for extra # from IB, not sure why it's needed other then for extra
@ -1199,12 +1209,26 @@ async def deliver_trade_events(
) )
case 'error': case 'error':
# NOTE: see impl deats in
# `Client.inline_errors()::push_err()`
err: dict = item err: dict = item
# f$#$% gawd dammit insync.. code: int = err['error_code']
con = err['contract'] if code in {
if isinstance(con, Contract): 200, # uhh
err['contract'] = asdict(con)
# hist pacing / connectivity
162,
165,
# 'No market data during competing live session'
1669,
}:
continue
reqid: str = err['reqid']
acnt: str = flows.get(reqid)['account']
reason: str = err['reason']
if err['reqid'] == -1: if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}') log.error(f'TWS external order error:\n{pformat(err)}')
@ -1213,14 +1237,27 @@ async def deliver_trade_events(
# so we need some further filtering logic here.. # so we need some further filtering logic here..
# for most cases the 'status' block above should take # for most cases the 'status' block above should take
# care of this. # care of this.
# await ems_stream.send(BrokerdStatus( await ems_stream.send(
# status='error', BrokerdStatus(
# reqid=err['reqid'], status='error',
# reason=err['reason'], reqid=reqid,
# time_ns=time.time_ns(), reason=reason,
# account=accounts_def.inverse[trade.order.account], time_ns=time.time_ns(),
# broker_details={'name': 'ib'}, account=acnt,
# )) broker_details={'name': 'ib'},
)
)
canceled = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(), # cuz why not
status='canceled',
reason=reason,
account=acnt,
broker_details={'name': 'ib'},
)
await ems_stream.send(canceled)
flows.add_msg(reqid, canceled.to_dict())
case 'event': case 'event':