diff --git a/piker/accounting/calc.py b/piker/accounting/calc.py index 034c810a..62978850 100644 --- a/piker/accounting/calc.py +++ b/piker/accounting/calc.py @@ -20,6 +20,7 @@ you know when you're losing money (if possible) XD ''' from __future__ import annotations +from contextlib import contextmanager as cm from math import copysign from typing import ( Any, @@ -28,6 +29,7 @@ from typing import ( TYPE_CHECKING, ) +import polars as pl from pendulum import ( # datetime, DateTime, @@ -40,7 +42,6 @@ if TYPE_CHECKING: Transaction, ) - def ppu( clears: Iterator[Transaction], @@ -219,7 +220,7 @@ def iter_by_dt( # NOTE: parsers are looked up in the insert order # so if you know that the record stats show some field # is more common then others, stick it at the top B) - parsers: dict[tuple[str], Callable] = { + parsers: dict[str, Callable | None] = { 'dt': None, # parity case 'datetime': parse, # datetime-str 'time': from_timestamp, # float epoch @@ -232,10 +233,8 @@ def iter_by_dt( datetime presumably set at the ``'dt'`` field in each entry. ''' - # isdict: bool = False if isinstance(records, dict): - # isdict: bool = True - records = list(records.items()) + records: list[tuple[str, dict]] = list(records.items()) def dyn_parse_to_dt( tx: tuple[str, dict[str, Any]] | Transaction, @@ -255,17 +254,17 @@ def iter_by_dt( or getattr(tx, k, None) ): v = tx[k] if isdict else tx.dt - if v is None: - breakpoint() + assert v is not None, f'No valid value for `{k}`!?' - parser = parsers[k] - - # only call parser on the value if not None from the - # `parsers` table above, otherwise pass through the value - # and sort on it directly + # only call parser on the value if not None from + # the `parsers` table above (when NOT using + # `.get()`), otherwise pass through the value and + # sort on it directly + parser: Callable | None = parsers[k] return parser(v) if (parser is not None) else v else: + # XXX: should never get here.. breakpoint() entry: tuple[str, dict] | Transaction @@ -274,3 +273,158 @@ def iter_by_dt( key=key or dyn_parse_to_dt, ): yield entry + + +# TODO: probably just move this into the test suite or +# keep it here for use from as such? +# def ensure_state(self) -> None: +# ''' +# Audit either the `.cumsize` and `.ppu` local instance vars against +# the clears table calculations and return the calc-ed values if +# they differ and log warnings to console. + +# ''' +# # clears: list[dict] = self._clears + +# # self.first_clear_dt = min(clears, key=lambda e: e['dt'])['dt'] +# last_clear: dict = clears[-1] +# csize: float = self.calc_size() +# accum: float = last_clear['accum_size'] + +# if not self.expired(): +# if ( +# csize != accum +# and csize != round(accum * (self.split_ratio or 1)) +# ): +# raise ValueError(f'Size mismatch: {csize}') +# else: +# assert csize == 0, 'Contract is expired but non-zero size?' + +# if self.cumsize != csize: +# log.warning( +# 'Position state mismatch:\n' +# f'{self.cumsize} => {csize}' +# ) +# self.cumsize = csize + +# cppu: float = self.calc_ppu() +# ppu: float = last_clear['ppu'] +# if ( +# cppu != ppu +# and self.split_ratio is not None + +# # handle any split info entered (for now) manually by user +# and cppu != (ppu / self.split_ratio) +# ): +# raise ValueError(f'PPU mismatch: {cppu}') + +# if self.ppu != cppu: +# log.warning( +# 'Position state mismatch:\n' +# f'{self.ppu} => {cppu}' +# ) +# self.ppu = cppu + + +@cm +def open_ledger_dfs( + + brokername: str, + acctname: str, + +) -> dict[str, pl.DataFrame]: + ''' + Open a ledger of trade records (presumably from some broker + backend), normalize the records into `Transactions` via the + backend's declared endpoint, cast to a `polars.DataFrame` which + can update the ledger on exit. + + ''' + from ._ledger import ( + open_trade_ledger, + # Transaction, + TransactionLedger, + ) + + ledger: TransactionLedger + import time + now = time.time() + with ( + open_trade_ledger( + brokername, + acctname, + rewrite=True, + ) as ledger, + ): + if not ledger: + raise ValueError(f'No ledger for {acctname}@{brokername} exists?') + + print(f'LEDGER LOAD TIME: {time.time() - now}') + # if acctname == 'paper': + # txns: dict[str, Transaction] = ledger.to_trans() + # else: + + # process raw TOML ledger into txns using the + # appropriate backend normalizer. + # cache: AssetsInfo = get_symcache( + # brokername, + # allow_reload=True, + # ) + + txns: dict[str, Transaction] + if acctname != 'paper': + txns = ledger.mod.norm_trade_records(ledger) + else: + txns = ledger.to_txns() + + ldf = pl.DataFrame( + list(txn.to_dict() for txn in txns.values()), + # schema=[ + # ('tid', str), + # ('fqme', str), + # ('dt', str), + # ('size', pl.Float64), + # ('price', pl.Float64), + # ('cost', pl.Float64), + # ('expiry', str), + # ('bs_mktid', str), + # ], + ).sort('dt').select([ + pl.col('fqme'), + pl.col('dt').str.to_datetime(), + # pl.col('expiry').dt.datetime(), + pl.col('bs_mktid'), + pl.col('size'), + pl.col('price'), + ]) + + # filter out to the columns matching values filter passed + # as input. + # if filter_by_ids: + # for col, vals in filter_by_ids.items(): + # str_vals = set(map(str, vals)) + # pred: pl.Expr = pl.col(col).eq(str_vals.pop()) + # for val in str_vals: + # pred |= pl.col(col).eq(val) + + # fdf = df.filter(pred) + + # bs_mktid: str = fdf[0]['bs_mktid'] + # pos: Position = acnt.pps[bs_mktid] + + # ppt = df.groupby('fqme').agg([ + # # TODO: ppu and bep !! + # pl.cumsum('size').alias('cumsum'), + # ]) + + dfs: dict[str, pl.DataFrame] = ldf.partition_by( + 'fqme', + as_dict=True, + ) + + # for fqme, ppt in act.items(): + # ppt.with_columns + # # TODO: ppu and bep !! + # pl.cumsum('size').alias('cumsum'), + # ]) + yield dfs