Order ledger entries by processed datetime

To make it easier to manually read/decipher long ledger files this adds
`dict` sorting based on record-type-specific (api vs. flex report)
datetime processing prior to ledger file write.

- break up parsers into separate routines for flex and api record
  processing.
- add `parse_flex_dt()` for special handling of the weird semicolon
  stamps in flex reports.
ib_1m_hist
Tyler Goodlet 2022-10-07 13:36:41 -04:00
parent df16726211
commit ceca0d9fb7
1 changed files with 127 additions and 70 deletions

View File

@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch entry['listingExchange'] = pexch
conf = get_config() conf = get_config()
entries = trades_to_ledger_entries( entries = api_trades_to_ledger_entries(
conf['accounts'].inverse, conf['accounts'].inverse,
trade_entries, trade_entries,
) )
@ -371,8 +371,8 @@ async def update_and_audit_msgs(
else: else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}' entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
# raise ValueError( raise ValueError(
log.error( # log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n' f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n' f'ib: {ibppmsg}\n'
f'piker: {msg}\n' f'piker: {msg}\n'
@ -1123,18 +1123,16 @@ def norm_trade_records(
continue continue
# timestamping is way different in API records # timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date') date = record.get('date')
if not date: flex_dtstr = record.get('dateTime')
# probably a flex record with a wonky non-std timestamp..
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
else: if dtstr or date:
# epoch_dt = pendulum.from_timestamp(record.get('time')) dt = pendulum.parse(dtstr or date)
dt = pendulum.parse(date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from # special handling of symbol extraction from
# flex records using some ad-hoc schema parsing. # flex records using some ad-hoc schema parsing.
@ -1183,69 +1181,58 @@ def norm_trade_records(
return {r.tid: r for r in records} return {r.tid: r for r in records}
def trades_to_ledger_entries( def parse_flex_dt(
record: str,
) -> pendulum.datetime:
date, ts = record.split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
def api_trades_to_ledger_entries(
accounts: bidict, accounts: bidict,
trade_entries: list[object], trade_entries: list[object],
source_type: str = 'api',
) -> dict: ) -> dict:
''' '''
Convert either of API execution objects or flex report Convert API execution objects entry objects into ``dict`` form,
entry objects into ``dict`` form, pretty much straight up pretty much straight up without modification except add
without modification. a `pydatetime` field from the parsed timestamp.
''' '''
trades_by_account = {} trades_by_account = {}
for t in trade_entries: for t in trade_entries:
if source_type == 'flex': # NOTE: example of schema we pull from the API client.
entry = t.__dict__ # {
# 'commissionReport': CommissionReport(...
# 'contract': {...
# 'execution': Execution(...
# 'time': 1654801166.0
# }
# XXX: LOL apparently ``toml`` has a bug # flatten all sub-dicts and values into one top level entry.
# where a section key error will show up in the write entry = {}
# if you leave a table key as an `int`? So i guess for section, val in t.items():
# cast to strs for all keys.. match section:
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)
# oddly for some so-called "BookTrade" entries case 'time':
# this field seems to be blank, no cuckin clue. # ib has wack ns timestamps, or is that us?
# trade['ibExecID'] continue
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name case _:
# get's lost? The user should be able to find it based entry[section] = val
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
elif source_type == 'api': tid = str(entry['execId'])
# NOTE: example of schema we pull from the API client. dt = pendulum.from_timestamp(entry['time'])
# { # TODO: why isn't this showing seconds in the str?
# 'commissionReport': CommissionReport(... entry['pydatetime'] = dt
# 'contract': {... entry['datetime'] = str(dt)
# 'execution': Execution(... acctid = accounts[entry['acctNumber']]
# 'time': 1654801166.0
# }
# flatten all sub-dicts and values into one top level entry.
entry = {}
for section, val in t.items():
match section:
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)
case 'time':
# ib has wack ns timestamps, or is that us?
continue
case _:
entry[section] = val
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['date'] = str(dt)
acctid = accounts[entry['acctNumber']]
if not tid: if not tid:
# this is likely some kind of internal adjustment # this is likely some kind of internal adjustment
@ -1263,6 +1250,73 @@ def trades_to_ledger_entries(
acctid, {} acctid, {}
)[tid] = entry )[tid] = entry
# sort entries in output by python based datetime
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
return trades_by_account
def flex_records_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
) -> dict:
'''
Convert flex report entry objects into ``dict`` form, pretty much
straight up without modification except add a `pydatetime` field
from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
# probably a flex record with a wonky non-std timestamp..
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
entry['datetime'] = str(dt)
if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1]['pydatetime'],
))
return trades_by_account return trades_by_account
@ -1309,15 +1363,16 @@ def load_flex_trades(
ln = len(trade_entries) ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query') log.info(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries( trades_by_account = flex_records_to_ledger_entries(
# get reverse map to user account names conf['accounts'].inverse, # reverse map to user account names
conf['accounts'].inverse,
trade_entries, trade_entries,
source_type='flex',
) )
ledger_dict: Optional[dict] = None
for acctid in trades_by_account: for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid] trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict: with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict) tid_delta = set(trades_by_id) - set(ledger_dict)
log.info( log.info(
@ -1325,9 +1380,11 @@ def load_flex_trades(
f'{pformat(tid_delta)}' f'{pformat(tid_delta)}'
) )
if tid_delta: if tid_delta:
ledger_dict.update( sorted_delta = dict(sorted(
{tid: trades_by_id[tid] for tid in tid_delta} {tid: trades_by_id[tid] for tid in tid_delta}.items(),
) key=lambda entry: entry[1].pop('pydatetime'),
))
ledger_dict.update(sorted_delta)
return ledger_dict return ledger_dict