Order ledger entries by processed datetime

To make it easier to manually read/decipher long ledger files this adds
`dict` sorting based on record-type-specific (api vs. flex report)
datetime processing prior to ledger file write.

- break up parsers into separate routines for flex and api record
  processing.
- add `parse_flex_dt()` for special handling of the weird semicolon
  stamps in flex reports.
clears_table_events
Tyler Goodlet 2022-10-07 13:36:41 -04:00
parent d2b6216994
commit e8ab28e456
1 changed files with 127 additions and 70 deletions

View File

@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch
conf = get_config()
entries = trades_to_ledger_entries(
entries = api_trades_to_ledger_entries(
conf['accounts'].inverse,
trade_entries,
)
@ -371,8 +371,8 @@ async def update_and_audit_msgs(
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
# raise ValueError(
log.error(
raise ValueError(
# log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
@ -1123,18 +1123,16 @@ def norm_trade_records(
continue
# timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date')
if not date:
# probably a flex record with a wonky non-std timestamp..
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
flex_dtstr = record.get('dateTime')
else:
# epoch_dt = pendulum.from_timestamp(record.get('time'))
dt = pendulum.parse(date)
if dtstr or date:
dt = pendulum.parse(dtstr or date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
@ -1183,69 +1181,58 @@ def norm_trade_records(
return {r.tid: r for r in records}
def trades_to_ledger_entries(
def parse_flex_dt(
record: str,
) -> pendulum.datetime:
date, ts = record.split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
def api_trades_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
source_type: str = 'api',
) -> dict:
'''
Convert either of API execution objects or flex report
entry objects into ``dict`` form, pretty much straight up
without modification.
Convert API execution objects entry objects into ``dict`` form,
pretty much straight up without modification except add
a `pydatetime` field from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
# 'contract': {...
# 'execution': Execution(...
# 'time': 1654801166.0
# }
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# flatten all sub-dicts and values into one top level entry.
entry = {}
for section, val in t.items():
match section:
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
case 'time':
# ib has wack ns timestamps, or is that us?
continue
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
case _:
entry[section] = val
elif source_type == 'api':
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
# 'contract': {...
# 'execution': Execution(...
# 'time': 1654801166.0
# }
# flatten all sub-dicts and values into one top level entry.
entry = {}
for section, val in t.items():
match section:
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)
case 'time':
# ib has wack ns timestamps, or is that us?
continue
case _:
entry[section] = val
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['date'] = str(dt)
acctid = accounts[entry['acctNumber']]
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['pydatetime'] = dt
entry['datetime'] = str(dt)
acctid = accounts[entry['acctNumber']]
if not tid:
# this is likely some kind of internal adjustment
@ -1263,6 +1250,73 @@ def trades_to_ledger_entries(
acctid, {}
)[tid] = entry
# sort entries in output by python based datetime
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
return trades_by_account
def flex_records_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
) -> dict:
'''
Convert flex report entry objects into ``dict`` form, pretty much
straight up without modification except add a `pydatetime` field
from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
# probably a flex record with a wonky non-std timestamp..
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
entry['datetime'] = str(dt)
if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1]['pydatetime'],
))
return trades_by_account
@ -1309,15 +1363,16 @@ def load_flex_trades(
ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names
conf['accounts'].inverse,
trades_by_account = flex_records_to_ledger_entries(
conf['accounts'].inverse, # reverse map to user account names
trade_entries,
source_type='flex',
)
ledger_dict: Optional[dict] = None
for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict)
log.info(
@ -1325,9 +1380,11 @@ def load_flex_trades(
f'{pformat(tid_delta)}'
)
if tid_delta:
ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}
)
sorted_delta = dict(sorted(
{tid: trades_by_id[tid] for tid in tid_delta}.items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
ledger_dict.update(sorted_delta)
return ledger_dict