Moar `.accounting` tweaks

- start flipping over internals to `Position.cumsize`
- allow passing in a `_mktmap_table` to `Account.update_from_ledger()`
  for cases where the caller wants to per-call-dyamically insert the
  `MktPair` via a one-off table (cough IB).
- use `polars.from_dicts()` in `.calc.open_ledger_dfs()`. and wrap the
  whole func in a new `toolz.open_crash_handler()`.
account_tests
Tyler Goodlet 2023-07-21 23:48:53 -04:00
parent 759ebe71e9
commit 897c20bd4a
3 changed files with 54 additions and 33 deletions

View File

@ -324,7 +324,7 @@ class Position(Struct):
(fiat) units. (fiat) units.
''' '''
return self.ppu * self.size return self.ppu * self.cumsize
def expired(self) -> bool: def expired(self) -> bool:
''' '''
@ -483,6 +483,8 @@ class Account(Struct):
cost_scalar: float = 2, cost_scalar: float = 2,
symcache: SymbologyCache | None = None, symcache: SymbologyCache | None = None,
_mktmap_table: dict[str, MktPair] | None = None,
) -> dict[str, Position]: ) -> dict[str, Position]:
''' '''
Update the internal `.pps[str, Position]` table from input Update the internal `.pps[str, Position]` table from input
@ -519,7 +521,14 @@ class Account(Struct):
# template the mkt-info presuming a legacy market ticks # template the mkt-info presuming a legacy market ticks
# if no info exists in the transactions.. # if no info exists in the transactions..
try:
mkt: MktPair = symcache.mktmaps[fqme] mkt: MktPair = symcache.mktmaps[fqme]
except KeyError:
# XXX: caller is allowed to provide a fallback
# mktmap table for the case where a new position is
# being added and the preloaded symcache didn't
# have this entry prior (eg. with frickin IB..)
mkt = _mktmap_table[fqme]
if not (pos := pps.get(bs_mktid)): if not (pos := pps.get(bs_mktid)):

View File

@ -361,8 +361,8 @@ def open_ledger_dfs(
if not ledger: if not ledger:
import time import time
from tractor._debug import open_crash_handler from tractor._debug import open_crash_handler
now = time.time()
now = time.time()
with ( with (
open_crash_handler(), open_crash_handler(),
@ -390,26 +390,29 @@ def open_ledger_dfs(
# ) # )
txns: dict[str, Transaction] = ledger.to_txns() txns: dict[str, Transaction] = ledger.to_txns()
ldf = pl.DataFrame( # ldf = pl.DataFrame(
# list(txn.to_dict() for txn in txns.values()),
ldf = pl.from_dicts(
list(txn.to_dict() for txn in txns.values()), list(txn.to_dict() for txn in txns.values()),
# schema=[
# ('tid', str), # only for ordering the cols
# ('fqme', str), schema=[
# ('dt', str), ('fqme', str),
# ('size', pl.Float64), ('tid', str),
# ('price', pl.Float64), ('bs_mktid', str),
# ('cost', pl.Float64), ('expiry', str),
# ('expiry', str), ('etype', str),
# ('bs_mktid', str), ('dt', str),
# ], ('size', pl.Float64),
# ).sort('dt').select([ ('price', pl.Float64),
).sort('dt').with_columns([ ('cost', pl.Float64),
# pl.col('fqme'), ],
).sort( # chronological order
'dt'
).with_columns([
pl.col('dt').str.to_datetime(), pl.col('dt').str.to_datetime(),
# pl.col('expiry').dt.datetime(), # pl.col('expiry').str.to_datetime(),
# pl.col('bs_mktid'), # pl.col('expiry').dt.date(),
# pl.col('size'),
# pl.col('price'),
]) ])
# filter out to the columns matching values filter passed # filter out to the columns matching values filter passed
@ -423,20 +426,24 @@ def open_ledger_dfs(
# fdf = df.filter(pred) # fdf = df.filter(pred)
# bs_mktid: str = fdf[0]['bs_mktid'] # break up into a frame per mkt / fqme
# pos: Position = acnt.pps[bs_mktid]
# TODO: not sure if this is even possible but..
# ppt = df.groupby('fqme').agg([
# # TODO: ppu and bep !!
# pl.cumsum('size').alias('cumsum'),
# ])
dfs: dict[str, pl.DataFrame] = ldf.partition_by( dfs: dict[str, pl.DataFrame] = ldf.partition_by(
'fqme', 'fqme',
as_dict=True, as_dict=True,
) )
# TODO: not sure if this is even possible but..
# - it'd be more ideal to use `ppt = df.groupby('fqme').agg([`
# - ppu and bep calcs!
for key in dfs: for key in dfs:
df = dfs[key] df = dfs[key]
# TODO: pass back the current `Position` object loaded from
# the account as well? Would provide incentive to do all
# this ledger loading inside a new async open_account().
# bs_mktid: str = df[0]['bs_mktid']
# pos: Position = acnt.pps[bs_mktid]
dfs[key] = df.with_columns([ dfs[key] = df.with_columns([
pl.cumsum('size').alias('cumsize'), pl.cumsum('size').alias('cumsize'),
]) ])

View File

@ -253,6 +253,7 @@ def disect(
), ),
): ):
from piker.log import get_console_log from piker.log import get_console_log
from piker.toolz import open_crash_handler
get_console_log(loglevel) get_console_log(loglevel)
pair: tuple[str, str] pair: tuple[str, str]
@ -266,10 +267,14 @@ def disect(
# actual ledger ref filled in with all txns # actual ledger ref filled in with all txns
ldgr: TransactionLedger ldgr: TransactionLedger
with open_ledger_dfs( pl.Config.set_tbl_cols(16)
with (
open_crash_handler(),
open_ledger_dfs(
brokername, brokername,
account, account,
) as (dfs, ldgr): ) as (dfs, ldgr),
):
# look up specific frame for fqme-selected asset # look up specific frame for fqme-selected asset
df = dfs[fqme] df = dfs[fqme]