From ceca0d9fb7be15a113b809f95bfb53cdf1c7a6ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Oct 2022 13:36:41 -0400 Subject: [PATCH] Order ledger entries by processed datetime To make it easier to manually read/decipher long ledger files this adds `dict` sorting based on record-type-specific (api vs. flex report) datetime processing prior to ledger file write. - break up parsers into separate routines for flex and api record processing. - add `parse_flex_dt()` for special handling of the weird semicolon stamps in flex reports. --- piker/brokers/ib/broker.py | 197 ++++++++++++++++++++++++------------- 1 file changed, 127 insertions(+), 70 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index cdd1f99a..daf9a703 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -305,7 +305,7 @@ async def update_ledger_from_api_trades( entry['listingExchange'] = pexch conf = get_config() - entries = trades_to_ledger_entries( + entries = api_trades_to_ledger_entries( conf['accounts'].inverse, trade_entries, ) @@ -371,8 +371,8 @@ async def update_and_audit_msgs( else: entry = f'split_ratio = 1/{int(reverse_split_ratio)}' - # raise ValueError( - log.error( + raise ValueError( + # log.error( f'POSITION MISMATCH ib <-> piker ledger:\n' f'ib: {ibppmsg}\n' f'piker: {msg}\n' @@ -1123,18 +1123,16 @@ def norm_trade_records( continue # timestamping is way different in API records + dtstr = record.get('datetime') date = record.get('date') - if not date: - # probably a flex record with a wonky non-std timestamp.. - date, ts = record['dateTime'].split(';') - dt = pendulum.parse(date) - ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' - tsdt = pendulum.parse(ts) - dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) + flex_dtstr = record.get('dateTime') - else: - # epoch_dt = pendulum.from_timestamp(record.get('time')) - dt = pendulum.parse(date) + if dtstr or date: + dt = pendulum.parse(dtstr or date) + + elif flex_dtstr: + # probably a flex record with a wonky non-std timestamp.. + dt = parse_flex_dt(record['dateTime']) # special handling of symbol extraction from # flex records using some ad-hoc schema parsing. @@ -1183,69 +1181,58 @@ def norm_trade_records( return {r.tid: r for r in records} -def trades_to_ledger_entries( +def parse_flex_dt( + record: str, +) -> pendulum.datetime: + date, ts = record.split(';') + dt = pendulum.parse(date) + ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' + tsdt = pendulum.parse(ts) + return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) + + +def api_trades_to_ledger_entries( accounts: bidict, trade_entries: list[object], - source_type: str = 'api', ) -> dict: ''' - Convert either of API execution objects or flex report - entry objects into ``dict`` form, pretty much straight up - without modification. + Convert API execution objects entry objects into ``dict`` form, + pretty much straight up without modification except add + a `pydatetime` field from the parsed timestamp. ''' trades_by_account = {} - for t in trade_entries: - if source_type == 'flex': - entry = t.__dict__ + # NOTE: example of schema we pull from the API client. + # { + # 'commissionReport': CommissionReport(... + # 'contract': {... + # 'execution': Execution(... + # 'time': 1654801166.0 + # } - # XXX: LOL apparently ``toml`` has a bug - # where a section key error will show up in the write - # if you leave a table key as an `int`? So i guess - # cast to strs for all keys.. + # flatten all sub-dicts and values into one top level entry. + entry = {} + for section, val in t.items(): + match section: + case 'contract' | 'execution' | 'commissionReport': + # sub-dict cases + entry.update(val) - # oddly for some so-called "BookTrade" entries - # this field seems to be blank, no cuckin clue. - # trade['ibExecID'] - tid = str(entry.get('ibExecID') or entry['tradeID']) - # date = str(entry['tradeDate']) + case 'time': + # ib has wack ns timestamps, or is that us? + continue - # XXX: is it going to cause problems if a account name - # get's lost? The user should be able to find it based - # on the actual exec history right? - acctid = accounts[str(entry['accountId'])] + case _: + entry[section] = val - elif source_type == 'api': - # NOTE: example of schema we pull from the API client. - # { - # 'commissionReport': CommissionReport(... - # 'contract': {... - # 'execution': Execution(... - # 'time': 1654801166.0 - # } - - # flatten all sub-dicts and values into one top level entry. - entry = {} - for section, val in t.items(): - match section: - case 'contract' | 'execution' | 'commissionReport': - # sub-dict cases - entry.update(val) - - case 'time': - # ib has wack ns timestamps, or is that us? - continue - - case _: - entry[section] = val - - tid = str(entry['execId']) - dt = pendulum.from_timestamp(entry['time']) - # TODO: why isn't this showing seconds in the str? - entry['date'] = str(dt) - acctid = accounts[entry['acctNumber']] + tid = str(entry['execId']) + dt = pendulum.from_timestamp(entry['time']) + # TODO: why isn't this showing seconds in the str? + entry['pydatetime'] = dt + entry['datetime'] = str(dt) + acctid = accounts[entry['acctNumber']] if not tid: # this is likely some kind of internal adjustment @@ -1263,6 +1250,73 @@ def trades_to_ledger_entries( acctid, {} )[tid] = entry + # sort entries in output by python based datetime + for acctid in trades_by_account: + trades_by_account[acctid] = dict(sorted( + trades_by_account[acctid].items(), + key=lambda entry: entry[1].pop('pydatetime'), + )) + + return trades_by_account + + +def flex_records_to_ledger_entries( + accounts: bidict, + trade_entries: list[object], + +) -> dict: + ''' + Convert flex report entry objects into ``dict`` form, pretty much + straight up without modification except add a `pydatetime` field + from the parsed timestamp. + + ''' + trades_by_account = {} + for t in trade_entries: + entry = t.__dict__ + + # XXX: LOL apparently ``toml`` has a bug + # where a section key error will show up in the write + # if you leave a table key as an `int`? So i guess + # cast to strs for all keys.. + + # oddly for some so-called "BookTrade" entries + # this field seems to be blank, no cuckin clue. + # trade['ibExecID'] + tid = str(entry.get('ibExecID') or entry['tradeID']) + # date = str(entry['tradeDate']) + + # XXX: is it going to cause problems if a account name + # get's lost? The user should be able to find it based + # on the actual exec history right? + acctid = accounts[str(entry['accountId'])] + + # probably a flex record with a wonky non-std timestamp.. + dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime']) + entry['datetime'] = str(dt) + + if not tid: + # this is likely some kind of internal adjustment + # transaction, likely one of the following: + # - an expiry event that will show a "book trade" indicating + # some adjustment to cash balances: zeroing or itm settle. + # - a manual cash balance position adjustment likely done by + # the user from the accounts window in TWS where they can + # manually set the avg price and size: + # https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST + log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') + continue + + trades_by_account.setdefault( + acctid, {} + )[tid] = entry + + for acctid in trades_by_account: + trades_by_account[acctid] = dict(sorted( + trades_by_account[acctid].items(), + key=lambda entry: entry[1]['pydatetime'], + )) + return trades_by_account @@ -1309,15 +1363,16 @@ def load_flex_trades( ln = len(trade_entries) log.info(f'Loaded {ln} trades from flex query') - trades_by_account = trades_to_ledger_entries( - # get reverse map to user account names - conf['accounts'].inverse, + trades_by_account = flex_records_to_ledger_entries( + conf['accounts'].inverse, # reverse map to user account names trade_entries, - source_type='flex', ) + ledger_dict: Optional[dict] = None + for acctid in trades_by_account: trades_by_id = trades_by_account[acctid] + with open_trade_ledger('ib', acctid) as ledger_dict: tid_delta = set(trades_by_id) - set(ledger_dict) log.info( @@ -1325,9 +1380,11 @@ def load_flex_trades( f'{pformat(tid_delta)}' ) if tid_delta: - ledger_dict.update( - {tid: trades_by_id[tid] for tid in tid_delta} - ) + sorted_delta = dict(sorted( + {tid: trades_by_id[tid] for tid in tid_delta}.items(), + key=lambda entry: entry[1].pop('pydatetime'), + )) + ledger_dict.update(sorted_delta) return ledger_dict