From 88b4ccc768d72f987e1106c6f71fcc073f1852e2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Jun 2022 13:25:08 -0400 Subject: [PATCH] Add API trade/exec entry parsing and ledger updates Since "flex reports" are only available for the current session's trades the day after, this adds support for also collecting trade execution records for the current session and writing them to the equivalent ledger file. Summary: - add `trades_to_records()` to handle parsing both flex and API event objects into a common record form. - add `norm_trade_records()` to handle converting ledger entries into `TradeRecord` types from the new `piker.pps` mod (coming in next commit). --- piker/brokers/ib/__init__.py | 5 +- piker/brokers/ib/api.py | 8 ++ piker/brokers/ib/broker.py | 209 ++++++++++++++++++++++++++++++----- 3 files changed, 193 insertions(+), 29 deletions(-) diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 3f6504a1..5c329ecc 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -38,7 +38,10 @@ from .feed import ( open_symbol_search, stream_quotes, ) -from .broker import trades_dialogue +from .broker import ( + trades_dialogue, + norm_trade_records, +) __all__ = [ 'get_client', diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 044415fc..a68ee6fe 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -483,6 +483,14 @@ class Client: return con + async def get_con( + self, + conid: int, + ) -> Contract: + return await self.ib.qualifyContractsAsync( + ibis.Contract(conId=conid) + ) + async def find_contract( self, pattern: str, diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index f9fab51d..2792c517 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -28,6 +28,7 @@ from typing import ( AsyncIterator, ) +from bidict import bidict import trio from trio_typing import TaskStatus import tractor @@ -44,8 +45,10 @@ from ib_insync.objects import ( Execution, ) from ib_insync.objects import Position +import pendulum from piker import config +from piker.pp import TradeRecord from piker.log import get_console_log from piker.clearing._messages import ( BrokerdOrder, @@ -56,6 +59,7 @@ from piker.clearing._messages import ( BrokerdFill, BrokerdError, ) +from piker.data._source import Symbol from .api import ( _accounts2clients, _adhoc_futes_set, @@ -64,6 +68,7 @@ from .api import ( open_client_proxies, Client, ) +# from .feed import open_data_client def pack_position( @@ -95,7 +100,6 @@ def pack_position( symkey += f'.{expiry}' # TODO: options contracts into a sane format.. - return BrokerdPosition( broker='ib', account=pos.account, @@ -317,11 +321,72 @@ async def trades_dialogue( all_positions.append(msg.dict()) - trades: list[dict] = [] + trades_by_account: dict = {} + conf = get_config() for proxy in proxies.values(): - trades.append(await proxy.trades()) + trade_entries = await proxy.trades() + # { + # 'commissionReport': CommissionReport( + # execId='', + # commission=0.0, + # currency='', + # realizedPNL=0.0, + # yield_=0.0, + # yieldRedemptionDate=0), + # 'contract': { + # 'comboLegs': [], + # 'comboLegsDescrip': '', + # 'conId': 477837024, + # 'currency': 'USD', + # 'deltaNeutralContract': None, + # 'exchange': 'GLOBEX', + # 'includeExpired': False, + # 'lastTradeDateOrContractMonth': '20220617', + # 'localSymbol': 'MNQM2', + # 'multiplier': '2', + # 'primaryExchange': '', + # 'right': '?', + # 'secId': '', + # 'secIdType': '', + # 'secType': 'FUT', + # 'strike': 0.0, + # 'symbol': 'MNQ', + # 'tradingClass': 'MNQ' + # }, + # 'execution': Execution( + # execId='0000e1a7.62a2315f.01.01', + # time=1654801166.0, + # acctNumber='DU5612476', + # exchange='GLOBEX', + # side='BOT', + # shares=1.0, + # price=12443.5, + # permId=778998556, + # clientId=6116, + # orderId=555, + # liquidation=0, + # cumQty=1.0, + # avgPrice=12443.5, + # orderRef='', + # evRule='', + # evMultiplier=0.0, + # modelCode='', + # lastLiquidity=1 + # ), + # 'time': 1654801166.0 + # } + trades_by_account.update( + trades_to_records( + conf['accounts'].inverse, + trade_entries, + ) + ) - log.info(f'Loaded {len(trades)} from this session') + for acctid, trades_by_id in trades_by_account.items(): + with config.open_trade_ledger('ib', acctid) as ledger: + ledger.update(trades_by_id) + + # log.info(f'Loaded {len(trades)} from this session') # TODO: write trades to local ``trades.toml`` # - use above per-session trades data and write to local file # - get the "flex reports" working and pull historical data and @@ -514,12 +579,114 @@ async def deliver_trade_events( await ems_stream.send(msg.dict()) +def norm_trade_records( + ledger: dict[str, Any], + +) -> dict[str, list[TradeRecord]]: + ''' + Normalize a flex report or API retrieved executions + ledger into our standard record format. + + ''' + records: list[TradeRecord] = [] + # async with open_data_client() as proxy: + for tid, record in ledger.items(): + # date, time = record['dateTime'] + # cost = record['cost'] + # action = record['buySell'] + conid = record.get('conId') or record['conid'] + comms = record.get('ibCommission', 0) + price = record.get('price') or record['tradePrice'] + size = record.get('shares') or record['quantity'] + + symbol = record['symbol'] + + # special handling of symbol extraction from + # flex records using some ad-hoc schema parsing. + instr = record.get('assetCategory') + if instr == 'FUT': + symbol = record['description'][:3] + + # try to build out piker fqsn from record. + expiry = record.get('lastTradeDateOrContractMonth') or record['expiry'] + exch = record.get('listingExchange') or record['exchange'] + + fqsn = Symbol.from_broker_info( + broker='ib', + symbol=symbol, + suffix=f'{exch}.{expiry}', + info={}, + ).front_fqsn() + + # NOTE: for flex records the normal fields won't be available so + # we have to do a lookup at some point to reverse map the conid + # to a fqsn. + + # con = await proxy.get_con(conid) + + records.append(TradeRecord( + fqsn=fqsn, + tid=tid, + size=size, + price=price, + cost=comms, + symkey=conid, + )) + + return records + + +def trades_to_records( + accounts: bidict, + trade_entries: list[object], + source_type: str = 'api', + +) -> dict: + + trades_by_account = {} + + for t in trade_entries: + if source_type == 'flex': + entry = t.__dict__ + + # oddly for some so-called "BookTrade" entries + # this field seems to be blank, no cuckin clue. + # trade['ibExecID'] + + # XXX: LOL apparently ``toml`` has a bug + # where a section key error will show up in the write + # if you leave this as an ``int``? + tid = str(entry['tradeID']) + # date = str(entry['tradeDate']) + acctid = accounts[str(entry['accountId'])] + + elif source_type == 'api': + entry = {} + for section, obj in t.items(): + match section: + case 'commisionReport' | 'execution': + entry.update(asdict(obj)) + + case 'contract': + entry.update(obj) + + tid = str(entry['execId']) + dt = pendulum.from_timestamp(entry['time']) + entry['date'] = str(dt) + acctid = accounts[entry['acctNumber']] + + trades_by_account.setdefault( + acctid, {} + )[tid] = entry + + return trades_by_account + + def load_flex_trades( path: Optional[str] = None, ) -> dict[str, str]: - from pprint import pprint from ib_insync import flexreport, util conf = get_config() @@ -555,33 +722,19 @@ def load_flex_trades( report = flexreport.FlexReport(path=path) trade_entries = report.extract('Trade') + trades_by_account = trades_to_records( + # get reverse map to user account names + conf['accounts'].inverse, + trade_entries, + source_type='flex', + ) - # get reverse map to user account names - accounts = conf['accounts'].inverse - trades_by_account = {} - - for t in trade_entries: - - # XXX: LOL apparently ``toml`` has a bug - # where a section key error will show up in the write - # if you leave this as an ``int``? - trade = t.__dict__ - # oddly for some so-called "BookTrade" entries - # this field seems to be blank, no cuckin clue. - # trade['ibExecID'] - tid = str(trade['tradeID']) - date = str(trade['tradeDate']) - acctid = accounts[str(trade['accountId'])] - trades_by_account.setdefault( - acctid, {} - ).setdefault(date, {})[tid] = trade - - ln = len(trades_by_account.values()) - log.info(f'Loaded {ln} trades from flex query') + # ln = len(trades) + # log.info(f'Loaded {ln} trades from flex query') for acctid, trades_by_id in trades_by_account.items(): with config.open_trade_ledger('ib', acctid) as ledger: - ledger.update({'ib': trades_by_id}) + ledger.update(trades_by_id) if __name__ == '__main__':