From 82b718d5a35e76e5d6082759c0f023ce0e83fde0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 14 Jun 2022 16:23:46 -0400 Subject: [PATCH] Many, many `ib` trade log schema hackz I don't want to rant too much any more since it's pretty clear `ib` has either zero concern for its (api) user's or a severely terrible data management team and/or general inter-team coordination system, but this patch more or less hacks the flex report records to be similar enough to API "execution" / "fill" records such that they can be similarly normalized and stored as well as processed for position calculations.. Dirty deats, - use the `IB.fills()` method for pulling current session trade events since it's both recommended in the docs and does seem to capture more extensive meta-data. - add a `update_ledger_from_api()` helper which does all the insane work of making sure api trade entries are usable both within piker's global fqsn system but also compatible with incremental updates of positions computed from trade ledgers derived from ib's "flex reports". - add "auditting" of `ib`'s reported positioning API messages by comparison with piker's new "traders first" breakeven price style and complain via logging on mismatches. - handle buy vs. sell arithmetic (via a +ve or -ve multiplier) to make "size" arithmetic work for API trade entries.. - draft out options contract transaction parsing but skip in pps generation for now. - always use the "execution id" as ledger keys both in flex and api trade processing. - for whatever weird reason `ib_insync` doesn't include the so called "primary exchange" in contracts reported in fill events, so do manual contract lookups in such cases such that pps entries can be placed in the right fqsn section... Still ToDo: - incremental update on trade clears / position updates - pps audit from ledger depending on user config? --- piker/brokers/ib/broker.py | 286 +++++++++++++++++++++++++------------ 1 file changed, 198 insertions(+), 88 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index c032870f..8f3ec44c 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -26,6 +26,7 @@ from typing import ( Any, Optional, AsyncIterator, + Union, ) from bidict import bidict @@ -48,8 +49,7 @@ from ib_insync.objects import Position import pendulum from piker import config -from piker.pp import update_pps_conf -from piker.pp import TradeRecord +from piker import pp from piker.log import get_console_log from piker.clearing._messages import ( BrokerdOrder, @@ -68,6 +68,7 @@ from .api import ( get_config, open_client_proxies, Client, + MethodProxy, ) # from .feed import open_data_client @@ -87,27 +88,30 @@ def pack_position( symbol = con.symbol.lower() exch = (con.primaryExchange or con.exchange).lower() - symkey = '.'.join((symbol, exch)) + fqsn = '.'.join((symbol, exch)) if not exch: # attempt to lookup the symbol from our # hacked set.. for sym in _adhoc_futes_set: if symbol in sym: - symkey = sym + fqsn = sym break expiry = con.lastTradeDateOrContractMonth if expiry: - symkey += f'.{expiry}' + fqsn += f'.{expiry}' # TODO: options contracts into a sane format.. - return BrokerdPosition( - broker='ib', - account=pos.account, - symbol=symkey, - currency=con.currency, - size=float(pos.position), - avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + return ( + con.conId, + BrokerdPosition( + broker='ib', + account=pos.account, + symbol=fqsn, + currency=con.currency, + size=float(pos.position), + avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + ), ) @@ -262,6 +266,70 @@ async def recv_trade_updates( await client.ib.disconnectedEvent +async def update_ledger_from_api_trades( + clients: list[Union[Client, MethodProxy]], + ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg + +) -> dict[str, Any]: + + # construct piker pps from trade ledger, underneath using + # LIFO style breakeven pricing calcs. + conf = get_config() + + # retreive new trade executions from the last session + # and/or day's worth of trading and convert into trade + # records suitable for a local ledger file. + trades_by_account: dict = {} + for client in clients: + + trade_entries = await client.trades() + + # XXX; ERRGGG.. + # pack in the "primary/listing exchange" value from a + # contract lookup since it seems this isn't available by + # default from the `.fills()` method endpoint... + for entry in trade_entries: + condict = entry['contract'] + conid = condict['conId'] + pexch = condict['primaryExchange'] + + if not pexch: + con = (await client.get_con(conid=conid))[0] + pexch = con.primaryExchange + + entry['listingExchange'] = pexch + + records = trades_to_records( + conf['accounts'].inverse, + trade_entries, + ) + trades_by_account.update(records) + + # write recent session's trades to the user's (local) ledger file. + for acctid, trades_by_id in trades_by_account.items(): + + with pp.open_trade_ledger('ib', acctid) as ledger: + ledger.update(trades_by_id) + + # (incrementally) update the user's pps in mem and + # in the `pps.toml`. + records = norm_trade_records(trades_by_id) + + # remap stupid ledger fqsns (which are often + # filled with lesser venue/exchange values) to + # the ones we pull from the API via ib's reported + # positioning messages. + for r in records: + normed_msg = ib_pp_msgs[r.bsuid] + if normed_msg.symbol != r.fqsn: + log.warning( + f'Remapping ledger fqsn: {r.fqsn} -> {normed_msg.symbol}' + ) + r.fqsn = normed_msg.symbol + + pp.update_pps_conf('ib', acctid, records) + + @tractor.context async def trades_dialogue( @@ -311,7 +379,7 @@ async def trades_dialogue( assert account in accounts_def accounts.add(account) - pp_msgs = {} + cids2pps = {} # process pp value reported from ib's system. we only use these # to cross-check sizing since average pricing on their end uses @@ -320,65 +388,65 @@ async def trades_dialogue( # money.. xb for client in aioclients.values(): for pos in client.positions(): - msg = pack_position(pos) - msg.account = accounts_def.inverse[msg.account] - pp_msgs[msg.symbol] = msg + cid, msg = pack_position(pos) + msg.account = accounts_def.inverse[msg.account] + cids2pps[cid] = msg assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') - # built-out piker pps from trade ledger, underneath using - # LIFO style breakeven pricing calcs. - trades_by_account: dict = {} - conf = get_config() + # update trades ledgers for all accounts from + # connected api clients. + await update_ledger_from_api_trades( + proxies.values(), + cids2pps, # pass these in to map to correct fqsns.. + ) - # retreive new trade executions from the last session - # and/or day's worth of trading and convert into trade - # records suitable for a local ledger file. - for proxy in proxies.values(): - trade_entries = await proxy.trades() - records = trades_to_records( - conf['accounts'].inverse, - trade_entries, - ) - trades_by_account.update(records) - - # write recent session's trades to the user's (local) ledger - # file. - for acctid, trades_by_id in trades_by_account.items(): - with config.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) - - # (incrementally) update the user's pps in mem and - # in the `pps.toml`. - records = norm_trade_records(trades_by_id) - active = update_pps_conf('ib', acctid, records) - - # relay re-formatted pps as msgs to the ems. - for fqsn, pp in active.items(): - - ibppmsg = pp_msgs[fqsn.rstrip('.ib')] + # load all positions from `pps.toml`, cross check with ib's + # positions data, and relay re-formatted pps as msgs to the ems. + for acctid, by_fqsn in pp.get_pps('ib').items(): + for fqsn, posdict in by_fqsn.items(): + ibppmsg = cids2pps[posdict['bsuid']] msg = BrokerdPosition( broker='ib', # account=acctid + '.ib', # XXX: ok so this is annoying, we're relaying # an account name with the backend suffix prefixed - # but when reading accounts from ledgers + # but when reading accounts from ledgers we don't + # need it and/or it's prefixed in the section + # table.. account=ibppmsg.account, # XXX: the `.ib` is stripped..? symbol=ibppmsg.symbol, currency=ibppmsg.currency, - size=pp['size'], - avg_price=pp['avg_price'], + size=posdict['size'], + avg_price=posdict['avg_price'], ) - assert ibppmsg.size == msg.size + print(msg) + ibsize = ibppmsg.size + pikersize = msg.size + diff = pikersize - ibsize + + # if ib reports a lesser pp it's not as bad since we can + # presume we're at least not more in the shit then we + # thought. + if diff: + raise ValueError( + f'POSITION MISMATCH ib <-> piker ledger:\n' + f'ib: {ibsize}\n' + f'piker: {pikersize}\n' + 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + ) + msg.size = ibsize + if ibppmsg.avg_price != msg.avg_price: + # TODO: make this a "propoganda" log level? log.warning( 'The mega-cucks at IB want you to believe with their ' - '"FIFO" positioning the following:\n' + f'"FIFO" positioning for {msg.symbol}:\n' f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n' - f'piker, legitamous-ness, LIFO avg price: {msg.avg_price}' + f'piker, LIFO breakeven PnL price: {msg.avg_price}' ) all_positions.append(msg.dict()) @@ -545,7 +613,7 @@ async def deliver_trade_events( continue elif event_name == 'position': - msg = pack_position(item) + cid, msg = pack_position(item) msg.account = accounts_def.inverse[msg.account] elif event_name == 'event': @@ -579,25 +647,49 @@ async def deliver_trade_events( def norm_trade_records( ledger: dict[str, Any], -) -> dict[str, list[TradeRecord]]: +) -> dict[str, list[pp.Transaction]]: ''' Normalize a flex report or API retrieved executions ledger into our standard record format. ''' - records: list[TradeRecord] = [] - # async with open_data_client() as proxy: + records: list[pp.Transaction] = [] + for tid, record in ledger.items(): # date, time = record['dateTime'] # cost = record['cost'] # action = record['buySell'] conid = record.get('conId') or record['conid'] - comms = record.get('ibCommission', 0) + comms = record.get('commission') or -1*record['ibCommission'] price = record.get('price') or record['tradePrice'] - size = record.get('shares') or record['quantity'] + # the api doesn't do the -/+ on the quantity for you but flex + # records do.. are you fucking serious ib...!? + size = record.get('quantity') or record['shares'] * { + 'BOT': 1, + 'SLD': -1, + }[record['side']] + + exch = record['exchange'] + lexch = record.get('listingExchange') + + suffix = lexch or exch symbol = record['symbol'] + # likely an opts contract record from a flex report.. + # TODO: no idea how to parse ^ the strike part from flex.. + # (00010000 any, or 00007500 tsla, ..) + # we probably must do the contract lookup for this? + if ' ' in symbol or '--' in exch: + underlying, _, tail = symbol.partition(' ') + suffix = exch = 'opt' + expiry = tail[:6] + # otype = tail[6] + # strike = tail[7:] + + print(f'skipping opts contract {symbol}') + continue + # special handling of symbol extraction from # flex records using some ad-hoc schema parsing. instr = record.get('assetCategory') @@ -605,15 +697,16 @@ def norm_trade_records( symbol = record['description'][:3] # try to build out piker fqsn from record. - expiry = record.get('lastTradeDateOrContractMonth') or record['expiry'] - exch = record.get('listingExchange') or record['exchange'] + expiry = record.get( + 'lastTradeDateOrContractMonth') or record.get('expiry') + if expiry: + expiry = str(expiry).strip(' ') + suffix = f'{exch}.{expiry}' - fqsn = Symbol.from_broker_info( - broker='ib', - symbol=symbol, - suffix=f'{exch}.{expiry}', + fqsn = Symbol.from_fqsn( + fqsn=f'{symbol}.{suffix}.ib', info={}, - ).front_fqsn() + ).front_fqsn().rstrip('.ib') # NOTE: for flex records the normal fields won't be available so # we have to do a lookup at some point to reverse map the conid @@ -621,41 +714,50 @@ def norm_trade_records( # con = await proxy.get_con(conid) - records.append(TradeRecord( + records.append(pp.Transaction( fqsn=fqsn, tid=tid, size=size, price=price, cost=comms, - symkey=conid, + bsuid=conid, )) return records def trades_to_records( - accounts: bidict, trade_entries: list[object], source_type: str = 'api', ) -> dict: + ''' + Convert either of API execution objects or flex report + entry objects into ``dict`` form, pretty much straight up + without modification. + ''' trades_by_account = {} for t in trade_entries: if source_type == 'flex': entry = t.__dict__ + # XXX: LOL apparently ``toml`` has a bug + # where a section key error will show up in the write + # if you leave a table key as an `int`? So i guess + # cast to strs for all keys.. + # oddly for some so-called "BookTrade" entries # this field seems to be blank, no cuckin clue. # trade['ibExecID'] - - # XXX: LOL apparently ``toml`` has a bug - # where a section key error will show up in the write - # if you leave this as an ``int``? - tid = str(entry['tradeID']) + tid = str(entry.get('ibExecID') or entry['tradeID']) # date = str(entry['tradeDate']) + + # XXX: is it going to cause problems if a account name + # get's lost? The user should be able to find it based + # on the actual exec history right? acctid = accounts[str(entry['accountId'])] elif source_type == 'api': @@ -667,17 +769,19 @@ def trades_to_records( # 'time': 1654801166.0 # } + # flatten all sub-dicts and values into one top level entry. entry = {} - for section, obj in t.items(): + for section, val in t.items(): match section: - case 'commisionReport' | 'execution': - entry.update(asdict(obj)) - - case 'contract': - entry.update(obj) + case 'contract' | 'execution' | 'commissionReport': + # sub-dict cases + entry.update(val) + case _: + entry[section] = val tid = str(entry['execId']) dt = pendulum.from_timestamp(entry['time']) + # TODO: why isn't this showing seconds in the str? entry['date'] = str(dt) acctid = accounts[entry['acctNumber']] @@ -691,7 +795,7 @@ def trades_to_records( def load_flex_trades( path: Optional[str] = None, -) -> dict[str, str]: +) -> dict[str, Any]: from ib_insync import flexreport, util @@ -704,10 +808,10 @@ def load_flex_trades( token = conf.get('flex_token') if not token: raise ValueError( - 'You must specify a ``flex_token`` field in your' - '`brokers.toml` in order load your trade log, see our' - 'intructions for how to set this up here:\n' - 'PUT LINK HERE!' + 'You must specify a ``flex_token`` field in your' + '`brokers.toml` in order load your trade log, see our' + 'intructions for how to set this up here:\n' + 'PUT LINK HERE!' ) qid = conf['flex_trades_query_id'] @@ -728,6 +832,10 @@ def load_flex_trades( report = flexreport.FlexReport(path=path) trade_entries = report.extract('Trade') + ln = len(trade_entries) + # log.info(f'Loaded {ln} trades from flex query') + print(f'Loaded {ln} trades from flex query') + trades_by_account = trades_to_records( # get reverse map to user account names conf['accounts'].inverse, @@ -735,13 +843,15 @@ def load_flex_trades( source_type='flex', ) - # ln = len(trades) - # log.info(f'Loaded {ln} trades from flex query') - + ledgers = {} for acctid, trades_by_id in trades_by_account.items(): - with config.open_trade_ledger('ib', acctid) as ledger: + with pp.open_trade_ledger('ib', acctid) as ledger: ledger.update(trades_by_id) + ledgers[acctid] = ledger + + return ledgers + if __name__ == '__main__': import sys