Call `open_ledger_dfs()` for `disect` sub-cmd

Drop all the old `polars` (groupby + agg related) mangling to get a df
per fqme by delegating to the new routine and add in the `.cumsum()`ing
(per frame) as a first start on computing pps using dfs instead of
python dicts + loops as in `ppu()`.
account_tests
Tyler Goodlet 2023-07-10 09:13:59 -04:00
parent 8f1983fd8e
commit 3704e2ceac
1 changed files with 26 additions and 28 deletions

View File

@ -40,10 +40,8 @@ from ._ledger import (
# open_trade_ledger,
# TransactionLedger,
)
from ._pos import (
PpTable,
load_pps_from_ledger,
# load_account,
from .calc import (
open_ledger_dfs,
)
@ -241,8 +239,10 @@ def disect(
# "fully_qualified_account_name"
fqan: str,
fqme: str, # for ib
pdb: bool = False,
# TODO: in tractor we should really have
# a debug_mode ctx for wrapping any kind of code no?
pdb: bool = False,
bs_mktid: str = typer.Option(
None,
"-bid",
@ -252,34 +252,32 @@ def disect(
"-l",
),
):
from piker.log import get_console_log
get_console_log(loglevel)
pair: tuple[str, str]
if not (pair := unpack_fqan(fqan)):
raise ValueError('{fqan} malformed!?')
brokername, account = pair
# ledger: TransactionLedger
# records: dict[str, dict]
table: PpTable
df: pl.DataFrame # legder df
ppt: pl.DataFrame # piker position table
df, ppt, table = load_pps_from_ledger(
# ledger dfs groupby-partitioned by fqme
dfs: dict[str, pl.DataFrame]
with open_ledger_dfs(
brokername,
account,
filter_by_ids={'fqme': [fqme]},
)
# sers = [
# pl.Series(e['fqme'], e['cumsum'])
# for e in ppt.to_dicts()
# ]
# ppt_by_id: pl.DataFrame = ppt.filter(
# pl.col('fqme') == fqme,
# )
) as dfs:
for key in dfs:
df = dfs[key]
dfs[key] = df.with_columns([
pl.cumsum('size').alias('cumsum'),
])
ppt = dfs[fqme]
assert not df.is_empty()
assert not ppt.is_empty()
# TODO: we REALLY need a better console REPL for this
# kinda thing..
breakpoint()
# with open_trade_ledger(
# brokername,
# account,
# ) as ledger:
# for tid, rec in ledger.items():
# bs_mktid: str = rec['bs_mktid']