diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 7ab827c3..58fd6a1e 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -916,22 +916,20 @@ class Client: ) -> None: - reason = errorString + reason: str = errorString if reqId == -1: # it's a general event? - key = 'event' + key: str = 'event' log.info(errorString) else: - key = 'error' + key: str = 'error' log.error(errorString) try: to_trio.send_nowait(( key, - - # error "object" { 'type': key, 'reqid': reqId, diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 262cbc1d..f1070a43 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -20,7 +20,6 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations from contextlib import ExitStack -from dataclasses import asdict from functools import partial from pprint import pformat import time @@ -64,6 +63,7 @@ from piker.data import ( open_symcache, SymbologyCache, ) +from piker.clearing import OrderDialogs from piker.clearing._messages import ( Order, Status, @@ -124,6 +124,7 @@ async def handle_order_requests( ems_order_stream: tractor.MsgStream, accounts_def: dict[str, str], + flows: OrderDialogs, ) -> None: @@ -169,8 +170,9 @@ async def handle_order_requests( # there is no existing order so ask the client to create # a new one (which it seems to do by allocating an int # counter - collision prone..) - reqid = order.reqid + reqid: int | None = order.reqid if reqid is not None: + log.error(f'TYPE .reqid: {reqid} -> {type(reqid)}') reqid = int(reqid) # call our client api to submit the order @@ -191,15 +193,15 @@ async def handle_order_requests( )) # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - # ems order request id - oid=order.oid, - # broker specific request id - reqid=reqid, - account=account, - ) + ack = BrokerdOrderAck( + # ems order request id + oid=order.oid, + # broker specific request id + reqid=reqid, + account=account, ) + await ems_order_stream.send(ack) + flows.add_msg(reqid, ack.to_dict()) elif action == 'cancel': msg = BrokerdCancel(**request_msg) @@ -521,6 +523,8 @@ async def open_trade_dialog( ) -> AsyncIterator[dict[str, Any]]: + # task local msg dialog tracking + flows = OrderDialogs() accounts_def = config.load_accounts(['ib']) # deliver positions to subscriber before anything else @@ -755,6 +759,7 @@ async def open_trade_dialog( handle_order_requests, ems_stream, accounts_def, + flows, ) # allocate event relay tasks for each client connection @@ -767,6 +772,7 @@ async def open_trade_dialog( proxies, ledgers, tables, + flows, ) # write account and ledger files immediately! @@ -985,6 +991,8 @@ async def deliver_trade_events( ledgers, tables, + flows: OrderDialogs, + ) -> None: ''' Format and relay all trade events for a given client to emsd. @@ -1013,6 +1021,7 @@ async def deliver_trade_events( # unwrap needed data from ib_insync internal types trade: Trade = item + reqid: str = str(trade.order.orderId) status: OrderStatus = trade.orderStatus status_str: str = _statuses[status.status] remaining: float = status.remaining @@ -1027,7 +1036,7 @@ async def deliver_trade_events( # NOTE: should match the value returned from # `.submit_limit()` - reqid=execu.orderId, + reqid=reqid, action=_action_map[execu.side], size=execu.shares, @@ -1040,7 +1049,9 @@ async def deliver_trade_events( # XXX: required by order mode currently broker_time=execu.time, ) + await ems_stream.send(fill_msg) + flows.add_msg(reqid, fill_msg.to_dict()) if remaining == 0: # emit a closed status on filled statuses where @@ -1050,7 +1061,7 @@ async def deliver_trade_events( # skip duplicate filled updates - we get the deats # from the execution details event msg = BrokerdStatus( - reqid=trade.order.orderId, + reqid=reqid, time_ns=time.time_ns(), # cuz why not account=accounts_def.inverse[trade.order.account], @@ -1067,8 +1078,7 @@ async def deliver_trade_events( broker_details={'name': 'ib'}, ) await ems_stream.send(msg) - continue - + flows.add_msg(reqid, msg.to_dict()) # XXX: for wtv reason this is a separate event type # from IB, not sure why it's needed other then for extra @@ -1199,12 +1209,26 @@ async def deliver_trade_events( ) case 'error': + # NOTE: see impl deats in + # `Client.inline_errors()::push_err()` err: dict = item - # f$#$% gawd dammit insync.. - con = err['contract'] - if isinstance(con, Contract): - err['contract'] = asdict(con) + code: int = err['error_code'] + if code in { + 200, # uhh + + # hist pacing / connectivity + 162, + 165, + + # 'No market data during competing live session' + 1669, + }: + continue + + reqid: str = err['reqid'] + acnt: str = flows.get(reqid)['account'] + reason: str = err['reason'] if err['reqid'] == -1: log.error(f'TWS external order error:\n{pformat(err)}') @@ -1213,14 +1237,27 @@ async def deliver_trade_events( # so we need some further filtering logic here.. # for most cases the 'status' block above should take # care of this. - # await ems_stream.send(BrokerdStatus( - # status='error', - # reqid=err['reqid'], - # reason=err['reason'], - # time_ns=time.time_ns(), - # account=accounts_def.inverse[trade.order.account], - # broker_details={'name': 'ib'}, - # )) + await ems_stream.send( + BrokerdStatus( + status='error', + reqid=reqid, + reason=reason, + time_ns=time.time_ns(), + account=acnt, + broker_details={'name': 'ib'}, + ) + ) + + canceled = BrokerdStatus( + reqid=reqid, + time_ns=time.time_ns(), # cuz why not + status='canceled', + reason=reason, + account=acnt, + broker_details={'name': 'ib'}, + ) + await ems_stream.send(canceled) + flows.add_msg(reqid, canceled.to_dict()) case 'event':