Move df loading into `calc.load_ledger_dfs()`

To isolate it from the ledger/account mods and bc it is actually for
doing (eventual) position calcs / anal, might as well put it in this
mod. Add in the old-masked `ensure_state()` method content in case we
want to use it later for testing. Also tighten up the parser loading
inside `dyn_parse_to_dt()`.
account_tests
Tyler Goodlet 2023-07-10 09:11:15 -04:00
parent f5d4f58610
commit 8f1983fd8e
1 changed files with 166 additions and 12 deletions

View File

@ -20,6 +20,7 @@ you know when you're losing money (if possible) XD
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import contextmanager as cm
from math import copysign from math import copysign
from typing import ( from typing import (
Any, Any,
@ -28,6 +29,7 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
import polars as pl
from pendulum import ( from pendulum import (
# datetime, # datetime,
DateTime, DateTime,
@ -40,7 +42,6 @@ if TYPE_CHECKING:
Transaction, Transaction,
) )
def ppu( def ppu(
clears: Iterator[Transaction], clears: Iterator[Transaction],
@ -219,7 +220,7 @@ def iter_by_dt(
# NOTE: parsers are looked up in the insert order # NOTE: parsers are looked up in the insert order
# so if you know that the record stats show some field # so if you know that the record stats show some field
# is more common then others, stick it at the top B) # is more common then others, stick it at the top B)
parsers: dict[tuple[str], Callable] = { parsers: dict[str, Callable | None] = {
'dt': None, # parity case 'dt': None, # parity case
'datetime': parse, # datetime-str 'datetime': parse, # datetime-str
'time': from_timestamp, # float epoch 'time': from_timestamp, # float epoch
@ -232,10 +233,8 @@ def iter_by_dt(
datetime presumably set at the ``'dt'`` field in each entry. datetime presumably set at the ``'dt'`` field in each entry.
''' '''
# isdict: bool = False
if isinstance(records, dict): if isinstance(records, dict):
# isdict: bool = True records: list[tuple[str, dict]] = list(records.items())
records = list(records.items())
def dyn_parse_to_dt( def dyn_parse_to_dt(
tx: tuple[str, dict[str, Any]] | Transaction, tx: tuple[str, dict[str, Any]] | Transaction,
@ -255,17 +254,17 @@ def iter_by_dt(
or getattr(tx, k, None) or getattr(tx, k, None)
): ):
v = tx[k] if isdict else tx.dt v = tx[k] if isdict else tx.dt
if v is None: assert v is not None, f'No valid value for `{k}`!?'
breakpoint()
parser = parsers[k] # only call parser on the value if not None from
# the `parsers` table above (when NOT using
# only call parser on the value if not None from the # `.get()`), otherwise pass through the value and
# `parsers` table above, otherwise pass through the value # sort on it directly
# and sort on it directly parser: Callable | None = parsers[k]
return parser(v) if (parser is not None) else v return parser(v) if (parser is not None) else v
else: else:
# XXX: should never get here..
breakpoint() breakpoint()
entry: tuple[str, dict] | Transaction entry: tuple[str, dict] | Transaction
@ -274,3 +273,158 @@ def iter_by_dt(
key=key or dyn_parse_to_dt, key=key or dyn_parse_to_dt,
): ):
yield entry yield entry
# TODO: probably just move this into the test suite or
# keep it here for use from as such?
# def ensure_state(self) -> None:
# '''
# Audit either the `.cumsize` and `.ppu` local instance vars against
# the clears table calculations and return the calc-ed values if
# they differ and log warnings to console.
# '''
# # clears: list[dict] = self._clears
# # self.first_clear_dt = min(clears, key=lambda e: e['dt'])['dt']
# last_clear: dict = clears[-1]
# csize: float = self.calc_size()
# accum: float = last_clear['accum_size']
# if not self.expired():
# if (
# csize != accum
# and csize != round(accum * (self.split_ratio or 1))
# ):
# raise ValueError(f'Size mismatch: {csize}')
# else:
# assert csize == 0, 'Contract is expired but non-zero size?'
# if self.cumsize != csize:
# log.warning(
# 'Position state mismatch:\n'
# f'{self.cumsize} => {csize}'
# )
# self.cumsize = csize
# cppu: float = self.calc_ppu()
# ppu: float = last_clear['ppu']
# if (
# cppu != ppu
# and self.split_ratio is not None
# # handle any split info entered (for now) manually by user
# and cppu != (ppu / self.split_ratio)
# ):
# raise ValueError(f'PPU mismatch: {cppu}')
# if self.ppu != cppu:
# log.warning(
# 'Position state mismatch:\n'
# f'{self.ppu} => {cppu}'
# )
# self.ppu = cppu
@cm
def open_ledger_dfs(
brokername: str,
acctname: str,
) -> dict[str, pl.DataFrame]:
'''
Open a ledger of trade records (presumably from some broker
backend), normalize the records into `Transactions` via the
backend's declared endpoint, cast to a `polars.DataFrame` which
can update the ledger on exit.
'''
from ._ledger import (
open_trade_ledger,
# Transaction,
TransactionLedger,
)
ledger: TransactionLedger
import time
now = time.time()
with (
open_trade_ledger(
brokername,
acctname,
rewrite=True,
) as ledger,
):
if not ledger:
raise ValueError(f'No ledger for {acctname}@{brokername} exists?')
print(f'LEDGER LOAD TIME: {time.time() - now}')
# if acctname == 'paper':
# txns: dict[str, Transaction] = ledger.to_trans()
# else:
# process raw TOML ledger into txns using the
# appropriate backend normalizer.
# cache: AssetsInfo = get_symcache(
# brokername,
# allow_reload=True,
# )
txns: dict[str, Transaction]
if acctname != 'paper':
txns = ledger.mod.norm_trade_records(ledger)
else:
txns = ledger.to_txns()
ldf = pl.DataFrame(
list(txn.to_dict() for txn in txns.values()),
# schema=[
# ('tid', str),
# ('fqme', str),
# ('dt', str),
# ('size', pl.Float64),
# ('price', pl.Float64),
# ('cost', pl.Float64),
# ('expiry', str),
# ('bs_mktid', str),
# ],
).sort('dt').select([
pl.col('fqme'),
pl.col('dt').str.to_datetime(),
# pl.col('expiry').dt.datetime(),
pl.col('bs_mktid'),
pl.col('size'),
pl.col('price'),
])
# filter out to the columns matching values filter passed
# as input.
# if filter_by_ids:
# for col, vals in filter_by_ids.items():
# str_vals = set(map(str, vals))
# pred: pl.Expr = pl.col(col).eq(str_vals.pop())
# for val in str_vals:
# pred |= pl.col(col).eq(val)
# fdf = df.filter(pred)
# bs_mktid: str = fdf[0]['bs_mktid']
# pos: Position = acnt.pps[bs_mktid]
# ppt = df.groupby('fqme').agg([
# # TODO: ppu and bep !!
# pl.cumsum('size').alias('cumsum'),
# ])
dfs: dict[str, pl.DataFrame] = ldf.partition_by(
'fqme',
as_dict=True,
)
# for fqme, ppt in act.items():
# ppt.with_columns
# # TODO: ppu and bep !!
# pl.cumsum('size').alias('cumsum'),
# ])
yield dfs