From 3d20490ee55247663a02a129d379eb91860274e4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 15 Jul 2023 15:43:09 -0400 Subject: [PATCH] Move cum-calcs to `open_ledger_dfs()`, always parse `str`->`Datetime` Previously the cum-size calc(s) was in the `disect` CLI but it's better stuffed into the backing df converter. Also, ensure that whenever a `dt` field is type-detected as a `str` we parse it to `DateTime`. --- piker/accounting/calc.py | 197 ++++++++++++++++++++++----------------- piker/accounting/cli.py | 17 ++-- 2 files changed, 117 insertions(+), 97 deletions(-) diff --git a/piker/accounting/calc.py b/piker/accounting/calc.py index 90ad1cf7..db777140 100644 --- a/piker/accounting/calc.py +++ b/piker/accounting/calc.py @@ -20,6 +20,7 @@ you know when you're losing money (if possible) XD ''' from __future__ import annotations +from collections.abc import ValuesView from contextlib import contextmanager as cm from math import copysign from typing import ( @@ -39,6 +40,7 @@ from pendulum import ( if TYPE_CHECKING: from ._ledger import ( Transaction, + TransactionLedger, ) def ppu( @@ -72,18 +74,23 @@ def ppu( where `cost_basis` for the current step is simply the price * size of the most recent clearing transaction. + ----- + TODO: get the BEP computed and working similarly! + ----- + the equivalent "break even price" or bep at each new clear + event step conversely only changes when an "position exiting + clear" which **decreases** the cumulative dst asset size: + + bep[-1] = ppu[-1] - (cum_pnl[-1] / cumsize[-1]) + ''' asize_h: list[float] = [] # historical accumulative size ppu_h: list[float] = [] # historical price-per-unit ledger: dict[str, dict] = {} - # entry: dict[str, Any] | Transaction t: Transaction for t in clears: - # tid: str = entry['tid'] - # clear_size = entry['size'] clear_size: float = t.size - # clear_price: str | float = entry['price'] clear_price: str | float = t.price is_clear: bool = not isinstance(clear_price, str) @@ -152,7 +159,6 @@ def ppu( clear_price * abs(clear_size) + # transaction cost - # accum_sign * cost_scalar * entry['cost'] accum_sign * cost_scalar * t.cost ) @@ -187,13 +193,13 @@ def ppu( asize_h.append(accum_size) # ledger[t.tid] = { - # 'tx': t, + # 'txn': t, ledger[t.tid] = t.to_dict() | { 'ppu': ppu, 'cumsize': accum_size, 'sign_change': sign_change, - # TODO: cumpnl, bep + # TODO: cum_pnl, bep } final_ppu = ppu_h[-1] if ppu_h else 0 @@ -212,6 +218,7 @@ def ppu( def iter_by_dt( records: ( dict[str, dict[str, Any]] + | ValuesView[dict] # eg. `Position._events.values()` | list[dict] | list[Transaction] # XXX preferred! ), @@ -220,7 +227,7 @@ def iter_by_dt( # so if you know that the record stats show some field # is more common then others, stick it at the top B) parsers: dict[str, Callable | None] = { - 'dt': None, # parity case + 'dt': parse, # parity case 'datetime': parse, # datetime-str 'time': from_timestamp, # float epoch }, @@ -259,8 +266,13 @@ def iter_by_dt( # the `parsers` table above (when NOT using # `.get()`), otherwise pass through the value and # sort on it directly - parser: Callable | None = parsers[k] - return parser(v) if (parser is not None) else v + if ( + not isinstance(v, DateTime) + and (parser := parsers.get(k)) + ): + return parser(v) + else: + return v else: # XXX: should never get here.. @@ -271,6 +283,7 @@ def iter_by_dt( records, key=key or dyn_parse_to_dt, ): + # NOTE the type sig above; either pairs or txns B) yield entry @@ -331,7 +344,14 @@ def open_ledger_dfs( brokername: str, acctname: str, -) -> dict[str, pl.DataFrame]: + ledger: TransactionLedger | None = None, + + **kwargs, + +) -> tuple[ + dict[str, pl.DataFrame], + TransactionLedger, +]: ''' Open a ledger of trade records (presumably from some broker backend), normalize the records into `Transactions` via the @@ -341,86 +361,89 @@ def open_ledger_dfs( ''' from ._ledger import ( open_trade_ledger, - # Transaction, - TransactionLedger, ) - ledger: TransactionLedger - import time - now = time.time() - with ( - open_trade_ledger( - brokername, - acctname, - rewrite=True, - allow_from_sync_code=True, - ) as ledger, - ): - if not ledger: - raise ValueError(f'No ledger for {acctname}@{brokername} exists?') + if not ledger: + import time + from tractor._debug import open_crash_handler + now = time.time() - print(f'LEDGER LOAD TIME: {time.time() - now}') - # process raw TOML ledger into txns using the - # appropriate backend normalizer. - # cache: AssetsInfo = get_symcache( - # brokername, - # allow_reload=True, - # ) + with ( + open_crash_handler(), - txns: dict[str, Transaction] - if acctname != 'paper': - txns = ledger.mod.norm_trade_records(ledger) - else: - txns = ledger.to_txns() + open_trade_ledger( + brokername, + acctname, + rewrite=True, + allow_from_sync_code=True, - ldf = pl.DataFrame( - list(txn.to_dict() for txn in txns.values()), - # schema=[ - # ('tid', str), - # ('fqme', str), - # ('dt', str), - # ('size', pl.Float64), - # ('price', pl.Float64), - # ('cost', pl.Float64), - # ('expiry', str), - # ('bs_mktid', str), - # ], - ).sort('dt').select([ - pl.col('fqme'), - pl.col('dt').str.to_datetime(), - # pl.col('expiry').dt.datetime(), - pl.col('bs_mktid'), - pl.col('size'), - pl.col('price'), + # proxied through from caller + **kwargs, + + ) as ledger, + ): + if not ledger: + raise ValueError(f'No ledger for {acctname}@{brokername} exists?') + + print(f'LEDGER LOAD TIME: {time.time() - now}') + + # process raw TOML ledger into txns using the + # appropriate backend normalizer. + # cache: AssetsInfo = get_symcache( + # brokername, + # allow_reload=True, + # ) + + txns: dict[str, Transaction] = ledger.to_txns() + ldf = pl.DataFrame( + list(txn.to_dict() for txn in txns.values()), + # schema=[ + # ('tid', str), + # ('fqme', str), + # ('dt', str), + # ('size', pl.Float64), + # ('price', pl.Float64), + # ('cost', pl.Float64), + # ('expiry', str), + # ('bs_mktid', str), + # ], + # ).sort('dt').select([ + ).sort('dt').with_columns([ + # pl.col('fqme'), + pl.col('dt').str.to_datetime(), + # pl.col('expiry').dt.datetime(), + # pl.col('bs_mktid'), + # pl.col('size'), + # pl.col('price'), + ]) + + # filter out to the columns matching values filter passed + # as input. + # if filter_by_ids: + # for col, vals in filter_by_ids.items(): + # str_vals = set(map(str, vals)) + # pred: pl.Expr = pl.col(col).eq(str_vals.pop()) + # for val in str_vals: + # pred |= pl.col(col).eq(val) + + # fdf = df.filter(pred) + + # bs_mktid: str = fdf[0]['bs_mktid'] + # pos: Position = acnt.pps[bs_mktid] + + # TODO: not sure if this is even possible but.. + # ppt = df.groupby('fqme').agg([ + # # TODO: ppu and bep !! + # pl.cumsum('size').alias('cumsum'), + # ]) + dfs: dict[str, pl.DataFrame] = ldf.partition_by( + 'fqme', + as_dict=True, + ) + for key in dfs: + df = dfs[key] + dfs[key] = df.with_columns([ + pl.cumsum('size').alias('cumsize'), ]) - # filter out to the columns matching values filter passed - # as input. - # if filter_by_ids: - # for col, vals in filter_by_ids.items(): - # str_vals = set(map(str, vals)) - # pred: pl.Expr = pl.col(col).eq(str_vals.pop()) - # for val in str_vals: - # pred |= pl.col(col).eq(val) - - # fdf = df.filter(pred) - - # bs_mktid: str = fdf[0]['bs_mktid'] - # pos: Position = acnt.pps[bs_mktid] - - # ppt = df.groupby('fqme').agg([ - # # TODO: ppu and bep !! - # pl.cumsum('size').alias('cumsum'), - # ]) - - dfs: dict[str, pl.DataFrame] = ldf.partition_by( - 'fqme', - as_dict=True, - ) - - # for fqme, ppt in act.items(): - # ppt.with_columns - # # TODO: ppu and bep !! - # pl.cumsum('size').alias('cumsum'), - # ]) - yield dfs + yield dfs, ledger diff --git a/piker/accounting/cli.py b/piker/accounting/cli.py index 30c14704..9dc36b4d 100644 --- a/piker/accounting/cli.py +++ b/piker/accounting/cli.py @@ -37,8 +37,8 @@ from ..calc import humanize from ..brokers._daemon import broker_init from ._ledger import ( load_ledger, + TransactionLedger, # open_trade_ledger, - # TransactionLedger, ) from .calc import ( open_ledger_dfs, @@ -263,20 +263,17 @@ def disect( # ledger dfs groupby-partitioned by fqme dfs: dict[str, pl.DataFrame] + # actual ledger ref filled in with all txns + ldgr: TransactionLedger + with open_ledger_dfs( brokername, account, - ) as dfs: + ) as (dfs, ldgr): - for key in dfs: - df = dfs[key] - dfs[key] = df.with_columns([ - pl.cumsum('size').alias('cumsum'), - ]) - - ppt = dfs[fqme] + # look up specific frame for fqme-selected asset + df = dfs[fqme] assert not df.is_empty() - assert not ppt.is_empty() # TODO: we REALLY need a better console REPL for this # kinda thing..