diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index c032870f..8f3ec44c 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -26,6 +26,7 @@ from typing import ( Any, Optional, AsyncIterator, + Union, ) from bidict import bidict @@ -48,8 +49,7 @@ from ib_insync.objects import Position import pendulum from piker import config -from piker.pp import update_pps_conf -from piker.pp import TradeRecord +from piker import pp from piker.log import get_console_log from piker.clearing._messages import ( BrokerdOrder, @@ -68,6 +68,7 @@ from .api import ( get_config, open_client_proxies, Client, + MethodProxy, ) # from .feed import open_data_client @@ -87,27 +88,30 @@ def pack_position( symbol = con.symbol.lower() exch = (con.primaryExchange or con.exchange).lower() - symkey = '.'.join((symbol, exch)) + fqsn = '.'.join((symbol, exch)) if not exch: # attempt to lookup the symbol from our # hacked set.. for sym in _adhoc_futes_set: if symbol in sym: - symkey = sym + fqsn = sym break expiry = con.lastTradeDateOrContractMonth if expiry: - symkey += f'.{expiry}' + fqsn += f'.{expiry}' # TODO: options contracts into a sane format.. - return BrokerdPosition( - broker='ib', - account=pos.account, - symbol=symkey, - currency=con.currency, - size=float(pos.position), - avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + return ( + con.conId, + BrokerdPosition( + broker='ib', + account=pos.account, + symbol=fqsn, + currency=con.currency, + size=float(pos.position), + avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + ), ) @@ -262,6 +266,70 @@ async def recv_trade_updates( await client.ib.disconnectedEvent +async def update_ledger_from_api_trades( + clients: list[Union[Client, MethodProxy]], + ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg + +) -> dict[str, Any]: + + # construct piker pps from trade ledger, underneath using + # LIFO style breakeven pricing calcs. + conf = get_config() + + # retreive new trade executions from the last session + # and/or day's worth of trading and convert into trade + # records suitable for a local ledger file. + trades_by_account: dict = {} + for client in clients: + + trade_entries = await client.trades() + + # XXX; ERRGGG.. + # pack in the "primary/listing exchange" value from a + # contract lookup since it seems this isn't available by + # default from the `.fills()` method endpoint... + for entry in trade_entries: + condict = entry['contract'] + conid = condict['conId'] + pexch = condict['primaryExchange'] + + if not pexch: + con = (await client.get_con(conid=conid))[0] + pexch = con.primaryExchange + + entry['listingExchange'] = pexch + + records = trades_to_records( + conf['accounts'].inverse, + trade_entries, + ) + trades_by_account.update(records) + + # write recent session's trades to the user's (local) ledger file. + for acctid, trades_by_id in trades_by_account.items(): + + with pp.open_trade_ledger('ib', acctid) as ledger: + ledger.update(trades_by_id) + + # (incrementally) update the user's pps in mem and + # in the `pps.toml`. + records = norm_trade_records(trades_by_id) + + # remap stupid ledger fqsns (which are often + # filled with lesser venue/exchange values) to + # the ones we pull from the API via ib's reported + # positioning messages. + for r in records: + normed_msg = ib_pp_msgs[r.bsuid] + if normed_msg.symbol != r.fqsn: + log.warning( + f'Remapping ledger fqsn: {r.fqsn} -> {normed_msg.symbol}' + ) + r.fqsn = normed_msg.symbol + + pp.update_pps_conf('ib', acctid, records) + + @tractor.context async def trades_dialogue( @@ -311,7 +379,7 @@ async def trades_dialogue( assert account in accounts_def accounts.add(account) - pp_msgs = {} + cids2pps = {} # process pp value reported from ib's system. we only use these # to cross-check sizing since average pricing on their end uses @@ -320,65 +388,65 @@ async def trades_dialogue( # money.. xb for client in aioclients.values(): for pos in client.positions(): - msg = pack_position(pos) - msg.account = accounts_def.inverse[msg.account] - pp_msgs[msg.symbol] = msg + cid, msg = pack_position(pos) + msg.account = accounts_def.inverse[msg.account] + cids2pps[cid] = msg assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') - # built-out piker pps from trade ledger, underneath using - # LIFO style breakeven pricing calcs. - trades_by_account: dict = {} - conf = get_config() + # update trades ledgers for all accounts from + # connected api clients. + await update_ledger_from_api_trades( + proxies.values(), + cids2pps, # pass these in to map to correct fqsns.. + ) - # retreive new trade executions from the last session - # and/or day's worth of trading and convert into trade - # records suitable for a local ledger file. - for proxy in proxies.values(): - trade_entries = await proxy.trades() - records = trades_to_records( - conf['accounts'].inverse, - trade_entries, - ) - trades_by_account.update(records) - - # write recent session's trades to the user's (local) ledger - # file. - for acctid, trades_by_id in trades_by_account.items(): - with config.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) - - # (incrementally) update the user's pps in mem and - # in the `pps.toml`. - records = norm_trade_records(trades_by_id) - active = update_pps_conf('ib', acctid, records) - - # relay re-formatted pps as msgs to the ems. - for fqsn, pp in active.items(): - - ibppmsg = pp_msgs[fqsn.rstrip('.ib')] + # load all positions from `pps.toml`, cross check with ib's + # positions data, and relay re-formatted pps as msgs to the ems. + for acctid, by_fqsn in pp.get_pps('ib').items(): + for fqsn, posdict in by_fqsn.items(): + ibppmsg = cids2pps[posdict['bsuid']] msg = BrokerdPosition( broker='ib', # account=acctid + '.ib', # XXX: ok so this is annoying, we're relaying # an account name with the backend suffix prefixed - # but when reading accounts from ledgers + # but when reading accounts from ledgers we don't + # need it and/or it's prefixed in the section + # table.. account=ibppmsg.account, # XXX: the `.ib` is stripped..? symbol=ibppmsg.symbol, currency=ibppmsg.currency, - size=pp['size'], - avg_price=pp['avg_price'], + size=posdict['size'], + avg_price=posdict['avg_price'], ) - assert ibppmsg.size == msg.size + print(msg) + ibsize = ibppmsg.size + pikersize = msg.size + diff = pikersize - ibsize + + # if ib reports a lesser pp it's not as bad since we can + # presume we're at least not more in the shit then we + # thought. + if diff: + raise ValueError( + f'POSITION MISMATCH ib <-> piker ledger:\n' + f'ib: {ibsize}\n' + f'piker: {pikersize}\n' + 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + ) + msg.size = ibsize + if ibppmsg.avg_price != msg.avg_price: + # TODO: make this a "propoganda" log level? log.warning( 'The mega-cucks at IB want you to believe with their ' - '"FIFO" positioning the following:\n' + f'"FIFO" positioning for {msg.symbol}:\n' f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n' - f'piker, legitamous-ness, LIFO avg price: {msg.avg_price}' + f'piker, LIFO breakeven PnL price: {msg.avg_price}' ) all_positions.append(msg.dict()) @@ -545,7 +613,7 @@ async def deliver_trade_events( continue elif event_name == 'position': - msg = pack_position(item) + cid, msg = pack_position(item) msg.account = accounts_def.inverse[msg.account] elif event_name == 'event': @@ -579,25 +647,49 @@ async def deliver_trade_events( def norm_trade_records( ledger: dict[str, Any], -) -> dict[str, list[TradeRecord]]: +) -> dict[str, list[pp.Transaction]]: ''' Normalize a flex report or API retrieved executions ledger into our standard record format. ''' - records: list[TradeRecord] = [] - # async with open_data_client() as proxy: + records: list[pp.Transaction] = [] + for tid, record in ledger.items(): # date, time = record['dateTime'] # cost = record['cost'] # action = record['buySell'] conid = record.get('conId') or record['conid'] - comms = record.get('ibCommission', 0) + comms = record.get('commission') or -1*record['ibCommission'] price = record.get('price') or record['tradePrice'] - size = record.get('shares') or record['quantity'] + # the api doesn't do the -/+ on the quantity for you but flex + # records do.. are you fucking serious ib...!? + size = record.get('quantity') or record['shares'] * { + 'BOT': 1, + 'SLD': -1, + }[record['side']] + + exch = record['exchange'] + lexch = record.get('listingExchange') + + suffix = lexch or exch symbol = record['symbol'] + # likely an opts contract record from a flex report.. + # TODO: no idea how to parse ^ the strike part from flex.. + # (00010000 any, or 00007500 tsla, ..) + # we probably must do the contract lookup for this? + if ' ' in symbol or '--' in exch: + underlying, _, tail = symbol.partition(' ') + suffix = exch = 'opt' + expiry = tail[:6] + # otype = tail[6] + # strike = tail[7:] + + print(f'skipping opts contract {symbol}') + continue + # special handling of symbol extraction from # flex records using some ad-hoc schema parsing. instr = record.get('assetCategory') @@ -605,15 +697,16 @@ def norm_trade_records( symbol = record['description'][:3] # try to build out piker fqsn from record. - expiry = record.get('lastTradeDateOrContractMonth') or record['expiry'] - exch = record.get('listingExchange') or record['exchange'] + expiry = record.get( + 'lastTradeDateOrContractMonth') or record.get('expiry') + if expiry: + expiry = str(expiry).strip(' ') + suffix = f'{exch}.{expiry}' - fqsn = Symbol.from_broker_info( - broker='ib', - symbol=symbol, - suffix=f'{exch}.{expiry}', + fqsn = Symbol.from_fqsn( + fqsn=f'{symbol}.{suffix}.ib', info={}, - ).front_fqsn() + ).front_fqsn().rstrip('.ib') # NOTE: for flex records the normal fields won't be available so # we have to do a lookup at some point to reverse map the conid @@ -621,41 +714,50 @@ def norm_trade_records( # con = await proxy.get_con(conid) - records.append(TradeRecord( + records.append(pp.Transaction( fqsn=fqsn, tid=tid, size=size, price=price, cost=comms, - symkey=conid, + bsuid=conid, )) return records def trades_to_records( - accounts: bidict, trade_entries: list[object], source_type: str = 'api', ) -> dict: + ''' + Convert either of API execution objects or flex report + entry objects into ``dict`` form, pretty much straight up + without modification. + ''' trades_by_account = {} for t in trade_entries: if source_type == 'flex': entry = t.__dict__ + # XXX: LOL apparently ``toml`` has a bug + # where a section key error will show up in the write + # if you leave a table key as an `int`? So i guess + # cast to strs for all keys.. + # oddly for some so-called "BookTrade" entries # this field seems to be blank, no cuckin clue. # trade['ibExecID'] - - # XXX: LOL apparently ``toml`` has a bug - # where a section key error will show up in the write - # if you leave this as an ``int``? - tid = str(entry['tradeID']) + tid = str(entry.get('ibExecID') or entry['tradeID']) # date = str(entry['tradeDate']) + + # XXX: is it going to cause problems if a account name + # get's lost? The user should be able to find it based + # on the actual exec history right? acctid = accounts[str(entry['accountId'])] elif source_type == 'api': @@ -667,17 +769,19 @@ def trades_to_records( # 'time': 1654801166.0 # } + # flatten all sub-dicts and values into one top level entry. entry = {} - for section, obj in t.items(): + for section, val in t.items(): match section: - case 'commisionReport' | 'execution': - entry.update(asdict(obj)) - - case 'contract': - entry.update(obj) + case 'contract' | 'execution' | 'commissionReport': + # sub-dict cases + entry.update(val) + case _: + entry[section] = val tid = str(entry['execId']) dt = pendulum.from_timestamp(entry['time']) + # TODO: why isn't this showing seconds in the str? entry['date'] = str(dt) acctid = accounts[entry['acctNumber']] @@ -691,7 +795,7 @@ def trades_to_records( def load_flex_trades( path: Optional[str] = None, -) -> dict[str, str]: +) -> dict[str, Any]: from ib_insync import flexreport, util @@ -704,10 +808,10 @@ def load_flex_trades( token = conf.get('flex_token') if not token: raise ValueError( - 'You must specify a ``flex_token`` field in your' - '`brokers.toml` in order load your trade log, see our' - 'intructions for how to set this up here:\n' - 'PUT LINK HERE!' + 'You must specify a ``flex_token`` field in your' + '`brokers.toml` in order load your trade log, see our' + 'intructions for how to set this up here:\n' + 'PUT LINK HERE!' ) qid = conf['flex_trades_query_id'] @@ -728,6 +832,10 @@ def load_flex_trades( report = flexreport.FlexReport(path=path) trade_entries = report.extract('Trade') + ln = len(trade_entries) + # log.info(f'Loaded {ln} trades from flex query') + print(f'Loaded {ln} trades from flex query') + trades_by_account = trades_to_records( # get reverse map to user account names conf['accounts'].inverse, @@ -735,13 +843,15 @@ def load_flex_trades( source_type='flex', ) - # ln = len(trades) - # log.info(f'Loaded {ln} trades from flex query') - + ledgers = {} for acctid, trades_by_id in trades_by_account.items(): - with config.open_trade_ledger('ib', acctid) as ledger: + with pp.open_trade_ledger('ib', acctid) as ledger: ledger.update(trades_by_id) + ledgers[acctid] = ledger + + return ledgers + if __name__ == '__main__': import sys