diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index c2967d1b..a8484941 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -44,6 +44,7 @@ from ib_insync.order import ( from ib_insync.objects import ( Fill, Execution, + CommissionReport, ) from ib_insync.objects import Position import pendulum @@ -214,19 +215,35 @@ async def recv_trade_updates( # sync with trio task to_trio.send_nowait(None) - def push_tradesies(eventkit_obj, obj, fill=None): - """Push events to trio task. + def push_tradesies( + eventkit_obj, + obj, + fill: Optional[Fill] = None, + report: Optional[CommissionReport] = None, + ): + ''' + Push events to trio task. - """ - if fill is not None: - # execution details event - item = ('fill', (obj, fill)) + ''' + match eventkit_obj.name(): - elif eventkit_obj.name() == 'positionEvent': - item = ('position', obj) + case 'orderStatusEvent': + item = ('status', obj) - else: - item = ('status', obj) + case 'commissionReportEvent': + assert report + item = ('cost', report) + + case 'execDetailsEvent': + # execution details event + item = ('fill', (obj, fill)) + + case 'positionEvent': + item = ('position', obj) + + case _: + log.error(f'Error unknown event {obj}') + return log.info(f'eventkit event ->\n{pformat(item)}') @@ -242,15 +259,15 @@ async def recv_trade_updates( 'execDetailsEvent', # all "fill" updates 'positionEvent', # avg price updates per symbol per account - # 'commissionReportEvent', # XXX: ugh, it is a separate event from IB and it's # emitted as follows: # self.ib.commissionReportEvent.emit(trade, fill, report) + 'commissionReportEvent', # XXX: not sure yet if we need these # 'updatePortfolioEvent', - # XXX: these all seem to be weird ib_insync intrernal + # XXX: these all seem to be weird ib_insync internal # events that we probably don't care that much about # given the internal design is wonky af.. # 'newOrderEvent', @@ -267,7 +284,7 @@ async def recv_trade_updates( async def update_ledger_from_api_trades( - trade_entries: dict[str, Any], + trade_entries: list[dict[str, Any]], ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg client: Union[Client, MethodProxy], @@ -277,14 +294,6 @@ async def update_ledger_from_api_trades( # LIFO style breakeven pricing calcs. conf = get_config() - # retreive new trade executions from the last session - # and/or day's worth of trading and convert into trade - # records suitable for a local ledger file. - # trades_by_account: dict = {} - # for client in clients: - - # trade_entries = await client.trades() - # XXX; ERRGGG.. # pack in the "primary/listing exchange" value from a # contract lookup since it seems this isn't available by @@ -295,8 +304,13 @@ async def update_ledger_from_api_trades( pexch = condict['primaryExchange'] if not pexch: - con = (await client.get_con(conid=conid))[0] - pexch = con.primaryExchange + cons = await client.get_con(conid=conid) + if cons: + con = cons[0] + pexch = con.primaryExchange or con.exchange + else: + # for futes it seems like the primary is always empty? + pexch = condict['exchange'] entry['listingExchange'] = pexch @@ -304,38 +318,29 @@ async def update_ledger_from_api_trades( conf['accounts'].inverse, trade_entries, ) - # trades_by_account.update(records) + actives = {} # write recent session's trades to the user's (local) ledger file. for acctid, trades_by_id in records.items(): with pp.open_trade_ledger('ib', acctid) as ledger: ledger.update(trades_by_id) - # (incrementally) update the user's pps in mem and - # in the `pps.toml`. + # normalize records = norm_trade_records(trades_by_id) - # remap stupid ledger fqsns (which are often - # filled with lesser venue/exchange values) to - # the ones we pull from the API via ib's reported - # positioning messages. - for r in records: - normed_msg = ib_pp_msgs[r.bsuid] - if normed_msg.symbol != r.fqsn: - log.warning( - f'Remapping ledger fqsn: {r.fqsn} -> {normed_msg.symbol}' - ) - r.fqsn = normed_msg.symbol - + # (incrementally) update the user's pps in mem and + # in the `pps.toml`. active = pp.update_pps_conf('ib', acctid, records) + actives.update(active) - return active + return actives async def update_and_audit( by_fqsn: dict[str, pp.Position], cids2pps: dict[int, BrokerdPosition], + validate: bool = False, ) -> list[BrokerdPosition]: @@ -369,21 +374,24 @@ async def update_and_audit( size=p.size, avg_price=p.avg_price, ) - ibsize = ibppmsg.size - pikersize = msg.size - diff = pikersize - ibsize + msgs.append(msg) - # if ib reports a lesser pp it's not as bad since we can - # presume we're at least not more in the shit then we - # thought. - if diff: - raise ValueError( - f'POSITION MISMATCH ib <-> piker ledger:\n' - f'ib: {msg}\n' - f'piker: {ibppmsg}\n' - 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' - ) - msg.size = ibsize + if validate: + ibsize = ibppmsg.size + pikersize = msg.size + diff = pikersize - ibsize + + # if ib reports a lesser pp it's not as bad since we can + # presume we're at least not more in the shit then we + # thought. + if diff: + raise ValueError( + f'POSITION MISMATCH ib <-> piker ledger:\n' + f'ib: {ibppmsg}\n' + f'piker: {msg}\n' + 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + ) + msg.size = ibsize if ibppmsg.avg_price != msg.avg_price: @@ -395,8 +403,6 @@ async def update_and_audit( f'piker, LIFO breakeven PnL price: {msg.avg_price}' ) - msgs.append(msg) - return msgs @@ -449,8 +455,8 @@ async def trades_dialogue( assert account in accounts_def accounts.add(account) - cids2pps = {} - used_accounts = set() + cids2pps: dict[str, BrokerdPosition] = {} + active_accts: set[str] = set() # process pp value reported from ib's system. we only use these # to cross-check sizing since average pricing on their end uses @@ -461,7 +467,7 @@ async def trades_dialogue( for pos in client.positions(): cid, msg = pack_position(pos) acctid = msg.account = accounts_def.inverse[msg.account] - used_accounts.add(acctid) + active_accts.add(acctid) cids2pps[cid] = msg assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') @@ -469,8 +475,9 @@ async def trades_dialogue( # update trades ledgers for all accounts from # connected api clients. for account, proxy in proxies.items(): + trades = await proxy.trades() await update_ledger_from_api_trades( - await proxy.trades(), + trades, cids2pps, # pass these in to map to correct fqsns.. proxy, ) @@ -478,10 +485,10 @@ async def trades_dialogue( # load all positions from `pps.toml`, cross check with ib's # positions data, and relay re-formatted pps as msgs to the ems. for acctid, by_fqsn in pp.get_pps( - 'ib', acctids=used_accounts, + 'ib', acctids=active_accts, ).items(): - msgs = await update_and_audit(by_fqsn, cids2pps) + msgs = await update_and_audit(by_fqsn, cids2pps, validate=True) all_positions.extend(msg.dict() for msg in msgs) if not all_positions and cids2pps: @@ -512,174 +519,278 @@ async def trades_dialogue( deliver_trade_events, stream, ems_stream, - accounts_def + accounts_def, + cids2pps, + proxies, ) # block until cancelled await trio.sleep_forever() +async def emit_pp_update( + ems_stream: tractor.MsgStream, + trade_entry: dict, + accounts_def: bidict, + proxies: dict, + cids2pps: dict, + +) -> None: + + # compute and relay incrementally updated piker pp + acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] + proxy = proxies[acctid] + await update_ledger_from_api_trades( + [trade_entry], + cids2pps, # pass these in to map to correct fqsns.. + proxy, + ) + # load all positions from `pps.toml`, cross check with + # ib's positions data, and relay re-formatted pps as + # msgs to the ems. + for acctid, by_fqsn in pp.get_pps( + 'ib', + acctids={acctid}, + ).items(): + + # should only be one right? + msgs = await update_and_audit( + by_fqsn, + cids2pps, + validate=False, + ) + for msg in msgs: + await ems_stream.send(msg.dict()) + + async def deliver_trade_events( trade_event_stream: trio.MemoryReceiveChannel, ems_stream: tractor.MsgStream, accounts_def: dict[str, str], + cids2pps: dict[str, BrokerdPosition], + proxies: dict[str, MethodProxy], ) -> None: - '''Format and relay all trade events for a given client to the EMS. + ''' + Format and relay all trade events for a given client to emsd. ''' action_map = {'BOT': 'buy', 'SLD': 'sell'} + ids2fills: dict[str, dict] = {} # TODO: for some reason we can receive a ``None`` here when the # ib-gw goes down? Not sure exactly how that's happening looking # at the eventkit code above but we should probably handle it... async for event_name, item in trade_event_stream: - log.info(f'ib sending {event_name}:\n{pformat(item)}') - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) + match event_name: + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) - # XXX: here's some other sucky cases from the api - # - short-sale but securities haven't been located, in this - # case we should probably keep the order in some kind of - # weird state or cancel it outright? + # XXX: here's some other sucky cases from the api + # - short-sale but securities haven't been located, in this + # case we should probably keep the order in some kind of + # weird state or cancel it outright? - # status='PendingSubmit', message=''), - # status='Cancelled', message='Error 404, - # reqId 1550: Order held while securities are located.'), - # status='PreSubmitted', message='')], + # status='PendingSubmit', message=''), + # status='Cancelled', message='Error 404, + # reqId 1550: Order held while securities are located.'), + # status='PreSubmitted', message='')], - if event_name == 'status': + case 'status': - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... - # unwrap needed data from ib_insync internal types - trade: Trade = item - status: OrderStatus = trade.orderStatus + # unwrap needed data from ib_insync internal types + trade: Trade = item + status: OrderStatus = trade.orderStatus - # skip duplicate filled updates - we get the deats - # from the execution details event - msg = BrokerdStatus( + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = BrokerdStatus( - reqid=trade.order.orderId, - time_ns=time.time_ns(), # cuz why not - account=accounts_def.inverse[trade.order.account], + reqid=trade.order.orderId, + time_ns=time.time_ns(), # cuz why not + account=accounts_def.inverse[trade.order.account], - # everyone doin camel case.. - status=status.status.lower(), # force lower case + # everyone doin camel case.. + status=status.status.lower(), # force lower case - filled=status.filled, - reason=status.whyHeld, + filled=status.filled, + reason=status.whyHeld, - # this seems to not be necessarily up to date in the - # execDetails event.. so we have to send it here I guess? - remaining=status.remaining, + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=status.remaining, - broker_details={'name': 'ib'}, - ) + broker_details={'name': 'ib'}, + ) + await ems_stream.send(msg.dict()) - elif event_name == 'fill': + case 'fill': - # for wtv reason this is a separate event type - # from IB, not sure why it's needed other then for extra - # complexity and over-engineering :eyeroll:. - # we may just end up dropping these events (or - # translating them to ``Status`` msgs) if we can - # show the equivalent status events are no more latent. + # for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. - # unpack ib_insync types - # pep-0526 style: - # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations - trade: Trade - fill: Fill - trade, fill = item - execu: Execution = fill.execution + # unpack ib_insync types + # pep-0526 style: + # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + trade: Trade + fill: Fill - # TODO: normalize out commissions details? - details = { - 'contract': asdict(fill.contract), - 'execution': asdict(fill.execution), - 'commissions': asdict(fill.commissionReport), - 'broker_time': execu.time, # supposedly server fill time - 'name': 'ib', - } + # TODO: maybe we can use matching to better handle these cases. + trade, fill = item + execu: Execution = fill.execution + execid = execu.execId - msg = BrokerdFill( - # should match the value returned from `.submit_limit()` - reqid=execu.orderId, - time_ns=time.time_ns(), # cuz why not + # TODO: + # - normalize out commissions details? + # - this is the same as the unpacking loop above in + # ``trades_to_records()`` no? + trade_entry = ids2fills.setdefault(execid, {}) + cost_already_rx = bool(trade_entry) - action=action_map[execu.side], - size=execu.shares, - price=execu.price, + # if the costs report was already received this + # should be not empty right? + comms = fill.commissionReport.commission + if cost_already_rx: + assert comms - broker_details=details, - # XXX: required by order mode currently - broker_time=details['broker_time'], + trade_entry.update( + { + 'contract': asdict(fill.contract), + 'execution': asdict(fill.execution), + 'commissionReport': asdict(fill.commissionReport), + # supposedly server fill time? + 'broker_time': execu.time, + 'name': 'ib', + } + ) - ) + msg = BrokerdFill( + # should match the value returned from `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not - elif event_name == 'error': + action=action_map[execu.side], + size=execu.shares, + price=execu.price, - err: dict = item + broker_details=trade_entry, + # XXX: required by order mode currently + broker_time=trade_entry['broker_time'], - # f$#$% gawd dammit insync.. - con = err['contract'] - if isinstance(con, Contract): - err['contract'] = asdict(con) + ) + await ems_stream.send(msg.dict()) - if err['reqid'] == -1: - log.error(f'TWS external order error:\n{pformat(err)}') + # 2 cases: + # - fill comes first or + # - comms report comes first + comms = fill.commissionReport.commission + if comms: + # UGHHH since the commision report object might be + # filled in **after** we already serialized to dict.. + # def need something better for all this. + trade_entry.update( + {'commissionReport': asdict(fill.commissionReport)} + ) - # TODO: what schema for this msg if we're going to make it - # portable across all backends? - # msg = BrokerdError(**err) - continue + if comms or cost_already_rx: + # only send a pp update once we have a cost report + print("EMITTING PP") + await emit_pp_update( + ems_stream, + trade_entry, + accounts_def, + proxies, + cids2pps, + ) - elif event_name == 'position': + case 'cost': - cid, msg = pack_position(item) - msg.account = accounts_def.inverse[msg.account] + cr: CommissionReport = item + execid = cr.execId - elif event_name == 'event': + trade_entry = ids2fills.setdefault(execid, {}) + fill_already_rx = bool(trade_entry) - # it's either a general system status event or an external - # trade event? - log.info(f"TWS system status: \n{pformat(item)}") + # no fill msg has arrived yet so just fill out the + # cost report for now and when the fill arrives a pp + # msg can be emitted. + trade_entry.update( + {'commissionReport': asdict(cr)} + ) - # TODO: support this again but needs parsing at the callback - # level... - # reqid = item.get('reqid', 0) - # if getattr(msg, 'reqid', 0) < -1: - # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + if fill_already_rx: + print("EMITTING PP") + await emit_pp_update( + ems_stream, + trade_entry, + accounts_def, + proxies, + cids2pps, + ) - continue + case 'error': + err: dict = item - # msg.reqid = 'tws-' + str(-1 * reqid) + # f$#$% gawd dammit insync.. + con = err['contract'] + if isinstance(con, Contract): + err['contract'] = asdict(con) - # mark msg as from "external system" - # TODO: probably something better then this.. and start - # considering multiplayer/group trades tracking - # msg.broker_details['external_src'] = 'tws' + if err['reqid'] == -1: + log.error(f'TWS external order error:\n{pformat(err)}') - # XXX: we always serialize to a dict for msgpack - # translations, ideally we can move to an msgspec (or other) - # encoder # that can be enabled in ``tractor`` ahead of - # time so we can pass through the message types directly. - await ems_stream.send(msg.dict()) + # TODO: what schema for this msg if we're going to make it + # portable across all backends? + # msg = BrokerdError(**err) + + case 'position': + + cid, msg = pack_position(item) + # acctid = msg.account = accounts_def.inverse[msg.account] + # cuck ib and it's shitty fifo sys for pps! + # await ems_stream.send(msg.dict()) + + case 'event': + + # it's either a general system status event or an external + # trade event? + log.info(f"TWS system status: \n{pformat(item)}") + + # TODO: support this again but needs parsing at the callback + # level... + # reqid = item.get('reqid', 0) + # if getattr(msg, 'reqid', 0) < -1: + # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + # msg.reqid = 'tws-' + str(-1 * reqid) + + # mark msg as from "external system" + # TODO: probably something better then this.. and start + # considering multiplayer/group trades tracking + # msg.broker_details['external_src'] = 'tws' + + case _: + log.error(f'WTF: {event_name}: {item}') def norm_trade_records( @@ -814,6 +925,11 @@ def trades_to_records( case 'contract' | 'execution' | 'commissionReport': # sub-dict cases entry.update(val) + + case 'time': + # ib has wack ns timestamps, or is that us? + continue + case _: entry[section] = val