From 897c20bd4a013191ede3e6753ca7cda9fcd4c0fb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Jul 2023 23:48:53 -0400 Subject: [PATCH] Moar `.accounting` tweaks - start flipping over internals to `Position.cumsize` - allow passing in a `_mktmap_table` to `Account.update_from_ledger()` for cases where the caller wants to per-call-dyamically insert the `MktPair` via a one-off table (cough IB). - use `polars.from_dicts()` in `.calc.open_ledger_dfs()`. and wrap the whole func in a new `toolz.open_crash_handler()`. --- piker/accounting/_pos.py | 13 +++++++-- piker/accounting/calc.py | 61 ++++++++++++++++++++++------------------ piker/accounting/cli.py | 13 ++++++--- 3 files changed, 54 insertions(+), 33 deletions(-) diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 1e159083..4b4b72d6 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -324,7 +324,7 @@ class Position(Struct): (fiat) units. ''' - return self.ppu * self.size + return self.ppu * self.cumsize def expired(self) -> bool: ''' @@ -483,6 +483,8 @@ class Account(Struct): cost_scalar: float = 2, symcache: SymbologyCache | None = None, + _mktmap_table: dict[str, MktPair] | None = None, + ) -> dict[str, Position]: ''' Update the internal `.pps[str, Position]` table from input @@ -519,7 +521,14 @@ class Account(Struct): # template the mkt-info presuming a legacy market ticks # if no info exists in the transactions.. - mkt: MktPair = symcache.mktmaps[fqme] + try: + mkt: MktPair = symcache.mktmaps[fqme] + except KeyError: + # XXX: caller is allowed to provide a fallback + # mktmap table for the case where a new position is + # being added and the preloaded symcache didn't + # have this entry prior (eg. with frickin IB..) + mkt = _mktmap_table[fqme] if not (pos := pps.get(bs_mktid)): diff --git a/piker/accounting/calc.py b/piker/accounting/calc.py index ee349092..d86ad98c 100644 --- a/piker/accounting/calc.py +++ b/piker/accounting/calc.py @@ -361,8 +361,8 @@ def open_ledger_dfs( if not ledger: import time from tractor._debug import open_crash_handler - now = time.time() + now = time.time() with ( open_crash_handler(), @@ -390,26 +390,29 @@ def open_ledger_dfs( # ) txns: dict[str, Transaction] = ledger.to_txns() - ldf = pl.DataFrame( + # ldf = pl.DataFrame( + # list(txn.to_dict() for txn in txns.values()), + ldf = pl.from_dicts( list(txn.to_dict() for txn in txns.values()), - # schema=[ - # ('tid', str), - # ('fqme', str), - # ('dt', str), - # ('size', pl.Float64), - # ('price', pl.Float64), - # ('cost', pl.Float64), - # ('expiry', str), - # ('bs_mktid', str), - # ], - # ).sort('dt').select([ - ).sort('dt').with_columns([ - # pl.col('fqme'), + + # only for ordering the cols + schema=[ + ('fqme', str), + ('tid', str), + ('bs_mktid', str), + ('expiry', str), + ('etype', str), + ('dt', str), + ('size', pl.Float64), + ('price', pl.Float64), + ('cost', pl.Float64), + ], + ).sort( # chronological order + 'dt' + ).with_columns([ pl.col('dt').str.to_datetime(), - # pl.col('expiry').dt.datetime(), - # pl.col('bs_mktid'), - # pl.col('size'), - # pl.col('price'), + # pl.col('expiry').str.to_datetime(), + # pl.col('expiry').dt.date(), ]) # filter out to the columns matching values filter passed @@ -423,20 +426,24 @@ def open_ledger_dfs( # fdf = df.filter(pred) - # bs_mktid: str = fdf[0]['bs_mktid'] - # pos: Position = acnt.pps[bs_mktid] - - # TODO: not sure if this is even possible but.. - # ppt = df.groupby('fqme').agg([ - # # TODO: ppu and bep !! - # pl.cumsum('size').alias('cumsum'), - # ]) + # break up into a frame per mkt / fqme dfs: dict[str, pl.DataFrame] = ldf.partition_by( 'fqme', as_dict=True, ) + + # TODO: not sure if this is even possible but.. + # - it'd be more ideal to use `ppt = df.groupby('fqme').agg([` + # - ppu and bep calcs! for key in dfs: df = dfs[key] + + # TODO: pass back the current `Position` object loaded from + # the account as well? Would provide incentive to do all + # this ledger loading inside a new async open_account(). + # bs_mktid: str = df[0]['bs_mktid'] + # pos: Position = acnt.pps[bs_mktid] + dfs[key] = df.with_columns([ pl.cumsum('size').alias('cumsize'), ]) diff --git a/piker/accounting/cli.py b/piker/accounting/cli.py index 9dc36b4d..753e6513 100644 --- a/piker/accounting/cli.py +++ b/piker/accounting/cli.py @@ -253,6 +253,7 @@ def disect( ), ): from piker.log import get_console_log + from piker.toolz import open_crash_handler get_console_log(loglevel) pair: tuple[str, str] @@ -266,10 +267,14 @@ def disect( # actual ledger ref filled in with all txns ldgr: TransactionLedger - with open_ledger_dfs( - brokername, - account, - ) as (dfs, ldgr): + pl.Config.set_tbl_cols(16) + with ( + open_crash_handler(), + open_ledger_dfs( + brokername, + account, + ) as (dfs, ldgr), + ): # look up specific frame for fqme-selected asset df = dfs[fqme]