diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 3f6504a1..5c329ecc 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -38,7 +38,10 @@ from .feed import ( open_symbol_search, stream_quotes, ) -from .broker import trades_dialogue +from .broker import ( + trades_dialogue, + norm_trade_records, +) __all__ = [ 'get_client', diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 044415fc..a68ee6fe 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -483,6 +483,14 @@ class Client: return con + async def get_con( + self, + conid: int, + ) -> Contract: + return await self.ib.qualifyContractsAsync( + ibis.Contract(conId=conid) + ) + async def find_contract( self, pattern: str, diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index f9fab51d..2792c517 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -28,6 +28,7 @@ from typing import ( AsyncIterator, ) +from bidict import bidict import trio from trio_typing import TaskStatus import tractor @@ -44,8 +45,10 @@ from ib_insync.objects import ( Execution, ) from ib_insync.objects import Position +import pendulum from piker import config +from piker.pp import TradeRecord from piker.log import get_console_log from piker.clearing._messages import ( BrokerdOrder, @@ -56,6 +59,7 @@ from piker.clearing._messages import ( BrokerdFill, BrokerdError, ) +from piker.data._source import Symbol from .api import ( _accounts2clients, _adhoc_futes_set, @@ -64,6 +68,7 @@ from .api import ( open_client_proxies, Client, ) +# from .feed import open_data_client def pack_position( @@ -95,7 +100,6 @@ def pack_position( symkey += f'.{expiry}' # TODO: options contracts into a sane format.. - return BrokerdPosition( broker='ib', account=pos.account, @@ -317,11 +321,72 @@ async def trades_dialogue( all_positions.append(msg.dict()) - trades: list[dict] = [] + trades_by_account: dict = {} + conf = get_config() for proxy in proxies.values(): - trades.append(await proxy.trades()) + trade_entries = await proxy.trades() + # { + # 'commissionReport': CommissionReport( + # execId='', + # commission=0.0, + # currency='', + # realizedPNL=0.0, + # yield_=0.0, + # yieldRedemptionDate=0), + # 'contract': { + # 'comboLegs': [], + # 'comboLegsDescrip': '', + # 'conId': 477837024, + # 'currency': 'USD', + # 'deltaNeutralContract': None, + # 'exchange': 'GLOBEX', + # 'includeExpired': False, + # 'lastTradeDateOrContractMonth': '20220617', + # 'localSymbol': 'MNQM2', + # 'multiplier': '2', + # 'primaryExchange': '', + # 'right': '?', + # 'secId': '', + # 'secIdType': '', + # 'secType': 'FUT', + # 'strike': 0.0, + # 'symbol': 'MNQ', + # 'tradingClass': 'MNQ' + # }, + # 'execution': Execution( + # execId='0000e1a7.62a2315f.01.01', + # time=1654801166.0, + # acctNumber='DU5612476', + # exchange='GLOBEX', + # side='BOT', + # shares=1.0, + # price=12443.5, + # permId=778998556, + # clientId=6116, + # orderId=555, + # liquidation=0, + # cumQty=1.0, + # avgPrice=12443.5, + # orderRef='', + # evRule='', + # evMultiplier=0.0, + # modelCode='', + # lastLiquidity=1 + # ), + # 'time': 1654801166.0 + # } + trades_by_account.update( + trades_to_records( + conf['accounts'].inverse, + trade_entries, + ) + ) - log.info(f'Loaded {len(trades)} from this session') + for acctid, trades_by_id in trades_by_account.items(): + with config.open_trade_ledger('ib', acctid) as ledger: + ledger.update(trades_by_id) + + # log.info(f'Loaded {len(trades)} from this session') # TODO: write trades to local ``trades.toml`` # - use above per-session trades data and write to local file # - get the "flex reports" working and pull historical data and @@ -514,12 +579,114 @@ async def deliver_trade_events( await ems_stream.send(msg.dict()) +def norm_trade_records( + ledger: dict[str, Any], + +) -> dict[str, list[TradeRecord]]: + ''' + Normalize a flex report or API retrieved executions + ledger into our standard record format. + + ''' + records: list[TradeRecord] = [] + # async with open_data_client() as proxy: + for tid, record in ledger.items(): + # date, time = record['dateTime'] + # cost = record['cost'] + # action = record['buySell'] + conid = record.get('conId') or record['conid'] + comms = record.get('ibCommission', 0) + price = record.get('price') or record['tradePrice'] + size = record.get('shares') or record['quantity'] + + symbol = record['symbol'] + + # special handling of symbol extraction from + # flex records using some ad-hoc schema parsing. + instr = record.get('assetCategory') + if instr == 'FUT': + symbol = record['description'][:3] + + # try to build out piker fqsn from record. + expiry = record.get('lastTradeDateOrContractMonth') or record['expiry'] + exch = record.get('listingExchange') or record['exchange'] + + fqsn = Symbol.from_broker_info( + broker='ib', + symbol=symbol, + suffix=f'{exch}.{expiry}', + info={}, + ).front_fqsn() + + # NOTE: for flex records the normal fields won't be available so + # we have to do a lookup at some point to reverse map the conid + # to a fqsn. + + # con = await proxy.get_con(conid) + + records.append(TradeRecord( + fqsn=fqsn, + tid=tid, + size=size, + price=price, + cost=comms, + symkey=conid, + )) + + return records + + +def trades_to_records( + accounts: bidict, + trade_entries: list[object], + source_type: str = 'api', + +) -> dict: + + trades_by_account = {} + + for t in trade_entries: + if source_type == 'flex': + entry = t.__dict__ + + # oddly for some so-called "BookTrade" entries + # this field seems to be blank, no cuckin clue. + # trade['ibExecID'] + + # XXX: LOL apparently ``toml`` has a bug + # where a section key error will show up in the write + # if you leave this as an ``int``? + tid = str(entry['tradeID']) + # date = str(entry['tradeDate']) + acctid = accounts[str(entry['accountId'])] + + elif source_type == 'api': + entry = {} + for section, obj in t.items(): + match section: + case 'commisionReport' | 'execution': + entry.update(asdict(obj)) + + case 'contract': + entry.update(obj) + + tid = str(entry['execId']) + dt = pendulum.from_timestamp(entry['time']) + entry['date'] = str(dt) + acctid = accounts[entry['acctNumber']] + + trades_by_account.setdefault( + acctid, {} + )[tid] = entry + + return trades_by_account + + def load_flex_trades( path: Optional[str] = None, ) -> dict[str, str]: - from pprint import pprint from ib_insync import flexreport, util conf = get_config() @@ -555,33 +722,19 @@ def load_flex_trades( report = flexreport.FlexReport(path=path) trade_entries = report.extract('Trade') + trades_by_account = trades_to_records( + # get reverse map to user account names + conf['accounts'].inverse, + trade_entries, + source_type='flex', + ) - # get reverse map to user account names - accounts = conf['accounts'].inverse - trades_by_account = {} - - for t in trade_entries: - - # XXX: LOL apparently ``toml`` has a bug - # where a section key error will show up in the write - # if you leave this as an ``int``? - trade = t.__dict__ - # oddly for some so-called "BookTrade" entries - # this field seems to be blank, no cuckin clue. - # trade['ibExecID'] - tid = str(trade['tradeID']) - date = str(trade['tradeDate']) - acctid = accounts[str(trade['accountId'])] - trades_by_account.setdefault( - acctid, {} - ).setdefault(date, {})[tid] = trade - - ln = len(trades_by_account.values()) - log.info(f'Loaded {ln} trades from flex query') + # ln = len(trades) + # log.info(f'Loaded {ln} trades from flex query') for acctid, trades_by_id in trades_by_account.items(): with config.open_trade_ledger('ib', acctid) as ledger: - ledger.update({'ib': trades_by_id}) + ledger.update(trades_by_id) if __name__ == '__main__':