Add fast(er), time-sorted ledger records

Turns out that reading **and** writing with `tomlkit` is just wayya slow
for large documents like ledger files so move to using the `tomli`
sibling pkg `tomli-w` which seems to much improve on the latency, though
obviously longer run we're likely going to want:
- a better algorithm for only back loading records using as little
  history as possible
- a different serialization format for production maybe something
  like apache parquet?

The only issue with using a non-style-preserving writer is that we don't
necessarily get TOML conf ordering for free (without first ordering it
ourselves), and thus this patch also adds much more general date-time
sorting machinery which is now **required** when using
`open_trades_ledger()` via a `tx_sort: Callable`. By default we now
provide `.accounting._ledger.iter_by_dt()` (exposed in the subpkg mod)
which conducts dynamic "datetime key detection" based parsing of records
based on a `parsers: dict[str, Callabe]` input table. The default should
handle most use cases including all currently supported live backends
(kraken, ib) as well as our paper engine ledger-records format.

Granulars:
- adjust `Position.iter_clears()` to use new `iter_by_dt(key=lambda ..)`
  signature.
- add `tomli-w` to setup and our `tomlkit` fork to requirements file.
- move `.write_config()` to bottom of class defn.
- fix closed pos popping to not error if pp was already popped..
master
Tyler Goodlet 2023-05-18 18:07:12 -04:00
parent 89d24cfe33
commit c6da09f3c6
5 changed files with 113 additions and 63 deletions

View File

@ -22,6 +22,7 @@ for tendiez.
from ..log import get_logger
from ._ledger import (
iter_by_dt,
Transaction,
TransactionLedger,
open_trade_ledger,

View File

@ -32,10 +32,11 @@ from typing import (
from pendulum import (
datetime,
DateTime,
from_timestamp,
parse,
)
import tomlkit
import tomli
import tomli_w # for fast ledger writing
from .. import config
from ..data.types import Struct
@ -116,37 +117,13 @@ class TransactionLedger(UserDict):
self,
ledger_dict: dict,
file_path: Path,
tx_sort: Callable,
) -> None:
self.file_path = file_path
self.tx_sort = tx_sort
super().__init__(ledger_dict)
def write_config(self) -> None:
'''
Render the self.data ledger dict to it's TOML file form.
'''
towrite: dict[str, Any] = self.data.copy()
for tid, txdict in self.data.items():
# drop key for non-expiring assets
if (
'expiry' in txdict
and txdict['expiry'] is None
):
txdict.pop('expiry')
# re-write old acro-key
fqme = txdict.get('fqsn')
if fqme:
txdict['fqme'] = fqme
print(f'WRITING LEDGER {self.file_path}')
with self.file_path.open(mode='w') as fp:
tomlkit.dump(towrite, fp)
print(f'FINISHED WRITING LEDGER {self.file_path}')
def update_from_t(
self,
t: Transaction,
@ -182,6 +159,7 @@ class TransactionLedger(UserDict):
# and instead call it for each entry incrementally:
# normer = mod.norm_trade_record(txdict)
# TODO: use tx_sort here yah?
for tid, txdict in self.data.items():
# special field handling for datetimes
# to ensure pendulum is used!
@ -195,22 +173,20 @@ class TransactionLedger(UserDict):
# the ``.sys: MktPair`` info, so skip.
continue
yield (
tid,
Transaction(
fqsn=fqme,
tid=txdict['tid'],
dt=dt,
price=txdict['price'],
size=txdict['size'],
cost=txdict.get('cost', 0),
bs_mktid=txdict['bs_mktid'],
tx = Transaction(
fqsn=fqme,
tid=txdict['tid'],
dt=dt,
price=txdict['price'],
size=txdict['size'],
cost=txdict.get('cost', 0),
bs_mktid=txdict['bs_mktid'],
# TODO: change to .sys!
sym=mkt,
expiry=parse(expiry) if expiry else None,
)
# TODO: change to .sys!
sym=mkt,
expiry=parse(expiry) if expiry else None,
)
yield tid, tx
def to_trans(
self,
@ -223,12 +199,81 @@ class TransactionLedger(UserDict):
'''
return dict(self.iter_trans(**kwargs))
def write_config(
self,
) -> None:
'''
Render the self.data ledger dict to it's TOML file form.
'''
cpy = self.data.copy()
towrite: dict[str, Any] = {}
for tid, trans in cpy.items():
# drop key for non-expiring assets
txdict = towrite[tid] = self.data[tid]
if (
'expiry' in txdict
and txdict['expiry'] is None
):
txdict.pop('expiry')
# re-write old acro-key
fqme = txdict.get('fqsn')
if fqme:
txdict['fqme'] = fqme
with self.file_path.open(mode='wb') as fp:
tomli_w.dump(towrite, fp)
def iter_by_dt(
records: dict[str, Any],
# NOTE: parsers are looked up in the insert order
# so if you know that the record stats show some field
# is more common then others, stick it at the top B)
parsers: dict[tuple[str], Callable] = {
'dt': None, # parity case
'datetime': parse, # datetime-str
'time': from_timestamp, # float epoch
},
key: Callable | None = None,
) -> Iterator[tuple[str, dict]]:
'''
Iterate entries of a ``records: dict`` table sorted by entry recorded
datetime presumably set at the ``'dt'`` field in each entry.
'''
txs = records.items()
def dyn_parse_to_dt(
pair: tuple[str, dict],
) -> DateTime:
_, txdict = pair
k, v, parser = next(
(k, txdict[k], parsers[k]) for k in parsers if k in txdict
)
return parser(v) if parser else v
for tid, data in sorted(
records.items(),
key=key or dyn_parse_to_dt,
):
yield tid, data
@cm
def open_trade_ledger(
broker: str,
account: str,
# default is to sort by detected datetime-ish field
tx_sort: Callable = iter_by_dt,
) -> Generator[dict, None, None]:
'''
Indempotently create and read in a trade log file from the
@ -244,6 +289,7 @@ def open_trade_ledger(
ledger = TransactionLedger(
ledger_dict=cpy,
file_path=fpath,
tx_sort=tx_sort,
)
try:
yield ledger
@ -254,19 +300,3 @@ def open_trade_ledger(
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
log.info(f'Updating ledger for {fpath}:\n')
ledger.write_config()
def iter_by_dt(
clears: dict[str, Any],
) -> Iterator[tuple[str, dict]]:
'''
Iterate entries of a ``clears: dict`` table sorted by entry recorded
datetime presumably set at the ``'dt'`` field in each entry.
'''
for tid, data in sorted(
list(clears.items()),
key=lambda item: item[1]['dt'],
):
yield tid, data

View File

@ -307,10 +307,16 @@ class Position(Struct):
datetime-stamped order.
'''
return iter_by_dt(self.clears)
# sort on the already existing datetime that should have
# been generated for the entry's table
return iter_by_dt(
self.clears,
key=lambda entry: entry[1]['dt']
)
def calc_ppu(
self,
# include transaction cost in breakeven price
# and presume the worst case of the same cost
# to exit this transaction (even though in reality
@ -726,7 +732,15 @@ class PpTable(Struct):
if closed:
bs_mktid: str
for bs_mktid, pos in closed.items():
self.conf.pop(pos.symbol.fqme)
fqme: str = pos.symbol.fqme
if fqme in self.conf:
self.conf.pop(fqme)
else:
# TODO: we reallly need a diff set of
# loglevels/colors per subsys.
log.warning(
f'Recent position for {fqme} was closed!'
)
# if there are no active position entries according
# to the toml dump output above, then clear the config

View File

@ -13,3 +13,8 @@
# ``asyncvnc`` for sending interactions to ib-gw inside docker
-e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc
# ``tomlkit`` for account files and configs; we've
# added some new features that need to get upstreamed:
-e git+https://github.com/pikers/tomlkit.git@writing_docs_tweaks#egg=tomlkit

View File

@ -44,8 +44,9 @@ setup(
]
},
install_requires=[
'tomlkit', # fork & fix for now:
# 'tomlkit', # fork & fix for now..
'tomli', # for pre-3.11
'tomli-w', # for fast ledger writing
'colorlog',
'attrs',
'pygments',
@ -65,8 +66,7 @@ setup(
# normally pinned to particular git hashes..
# 'tractor',
# 'asyncvnc',
# 'pyqtgraph',
# anyio-marketstore # mkts tsdb client
# 'anyio-marketstore', # mkts tsdb client
# brokers
'asks', # for non-ws rest apis