From c3686185c1935214e6df6ac1ed19d8b4ca675eea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 27 Mar 2023 14:14:39 -0400 Subject: [PATCH] `ib`: only process ledger-txs once per client Previous we were re-processing all ledgers for every position msg received from the API, per client.. Instead do that once in a first pass and drop all key-miss lookups for `bs_mktid`s; it should never happen. Better typing for in-routine vars, convert pos msg/objects to `dict` prior to logging so it's sane to read on console. Skip processing specifically option contracts for now. --- piker/brokers/ib/broker.py | 73 ++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index fa94044c..03d073fc 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -38,6 +38,7 @@ from trio_typing import TaskStatus import tractor from ib_insync.contract import ( Contract, + Option, ) from ib_insync.order import ( Trade, @@ -88,14 +89,17 @@ from .api import ( def pack_position( pos: IbPosition -) -> dict[str, Any]: +) -> tuple[ + str, + dict[str, Any] +]: con = pos.contract fqsn, calc_price = con2fqsn(con) # TODO: options contracts into a sane format.. return ( - con.conId, + str(con.conId), BrokerdPosition( broker='ib', account=pos.account, @@ -383,20 +387,19 @@ async def update_and_audit_msgs( # raise ValueError( log.error( f'POSITION MISMATCH ib <-> piker ledger:\n' - f'ib: {ibppmsg}\n' - f'piker: {msg}\n' - f'reverse_split_ratio: {reverse_split_ratio}\n' - f'split_ratio: {split_ratio}\n\n' 'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n' 'If you are expecting a (reverse) split in this ' - 'instrument you should probably put the following ' + 'instrument you should probably put the following\n\n' f'in the `pps.toml` section:\n{entry}' + f'IB:\nm{ibppmsg.to_dict()}\n\n' + f'PIKER:\n{msg.to_dict()}\n\n' + # f'reverse_split_ratio: {reverse_split_ratio}\n' + # f'split_ratio: {split_ratio}\n\n' ) msg.size = ibsize if ibppmsg.avg_price != msg.avg_price: - - # TODO: make this a "propoganda" log level? + # TODO: make this a "propaganda" log level? log.warning( 'The mega-cucks at IB want you to believe with their ' f'"FIFO" positioning for {msg.symbol}:\n' @@ -425,10 +428,10 @@ async def update_and_audit_msgs( if validate and p.size: # raise ValueError( log.error( - f'UNEXPECTED POSITION says ib:\n' - f'piker: {msg}\n' + f'UNEXPECTED POSITION says IB:\n' 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n' - 'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?' + 'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?\n' + f'PIKER:\n{msg.to_dict()}\n' ) msgs.append(msg) @@ -611,6 +614,8 @@ async def trades_dialogue( # api clients which report trades for **this session**. trades = await proxy.trades() if trades: + trans_by_acct: dict[str, Transaction] + api_to_ledger_entries: dict[str, dict] ( trans_by_acct, api_to_ledger_entries, @@ -637,17 +642,30 @@ async def trades_dialogue( if trans: table.update_from_trans(trans) + trans = norm_trade_records(ledger) + table.update_from_trans(trans) + # process pp value reported from ib's system. we only # use these to cross-check sizing since average pricing # on their end uses the so called (bs) "FIFO" style # which more or less results in a price that's not # useful for traders who want to not lose money.. xb + # -> collect all ib-pp reported positions so that we can be + # sure know which positions to update from the ledger if + # any are missing from the ``pps.toml`` + pos: IbPosition # named tuple actually for pos in client.positions(): - # collect all ib-pp reported positions so that we can be - # sure know which positions to update from the ledger if - # any are missing from the ``pps.toml`` - bs_mktid, msg = pack_position(pos) + # NOTE XXX: we skip options for now since we don't + # yet support the symbology nor the live feeds. + if isinstance(pos.contract, Option): + log.warning( + f'Option contracts not supported for now:\n' + f'{pos._asdict()}' + ) + continue + + bs_mktid, msg = pack_position(pos) acctid = msg.account = accounts_def.inverse[msg.account] acctid = acctid.strip('ib.') cids2pps[(acctid, bs_mktid)] = msg @@ -663,29 +681,6 @@ async def trades_dialogue( not pp or pp.size != msg.size ): - trans = norm_trade_records(ledger) - table.update_from_trans(trans) - - # XXX: not sure exactly why it wouldn't be in - # the updated output (maybe this is a bug?) but - # if you create a pos from TWS and then load it - # from the api trades it seems we get a key - # error from ``update[bs_mktid]`` ? - pp = table.pps.get(bs_mktid) - if not pp: - log.error( - f'The contract id for {msg} may have ' - f'changed to {bs_mktid}\nYou may need to ' - 'adjust your ledger for this, skipping ' - 'for now.' - ) - continue - - # XXX: not sure exactly why it wouldn't be in - # the updated output (maybe this is a bug?) but - # if you create a pos from TWS and then load it - # from the api trades it seems we get a key - # error from ``update[bs_mktid]`` ? pp = table.pps[bs_mktid] pairinfo = pp.symbol if msg.size != pp.size: