Move cum-calcs to `open_ledger_dfs()`, always parse `str`->`Datetime`

Previously the cum-size calc(s) was in the `disect` CLI but it's better
stuffed into the backing df converter. Also, ensure that whenever
a `dt` field is type-detected as a `str` we parse it to `DateTime`.
account_tests
Tyler Goodlet 2023-07-15 15:43:09 -04:00
parent 69314e9fca
commit 3d20490ee5
2 changed files with 117 additions and 97 deletions

View File

@ -20,6 +20,7 @@ you know when you're losing money (if possible) XD
'''
from __future__ import annotations
from collections.abc import ValuesView
from contextlib import contextmanager as cm
from math import copysign
from typing import (
@ -39,6 +40,7 @@ from pendulum import (
if TYPE_CHECKING:
from ._ledger import (
Transaction,
TransactionLedger,
)
def ppu(
@ -72,18 +74,23 @@ def ppu(
where `cost_basis` for the current step is simply the price
* size of the most recent clearing transaction.
-----
TODO: get the BEP computed and working similarly!
-----
the equivalent "break even price" or bep at each new clear
event step conversely only changes when an "position exiting
clear" which **decreases** the cumulative dst asset size:
bep[-1] = ppu[-1] - (cum_pnl[-1] / cumsize[-1])
'''
asize_h: list[float] = [] # historical accumulative size
ppu_h: list[float] = [] # historical price-per-unit
ledger: dict[str, dict] = {}
# entry: dict[str, Any] | Transaction
t: Transaction
for t in clears:
# tid: str = entry['tid']
# clear_size = entry['size']
clear_size: float = t.size
# clear_price: str | float = entry['price']
clear_price: str | float = t.price
is_clear: bool = not isinstance(clear_price, str)
@ -152,7 +159,6 @@ def ppu(
clear_price * abs(clear_size)
+
# transaction cost
# accum_sign * cost_scalar * entry['cost']
accum_sign * cost_scalar * t.cost
)
@ -187,13 +193,13 @@ def ppu(
asize_h.append(accum_size)
# ledger[t.tid] = {
# 'tx': t,
# 'txn': t,
ledger[t.tid] = t.to_dict() | {
'ppu': ppu,
'cumsize': accum_size,
'sign_change': sign_change,
# TODO: cumpnl, bep
# TODO: cum_pnl, bep
}
final_ppu = ppu_h[-1] if ppu_h else 0
@ -212,6 +218,7 @@ def ppu(
def iter_by_dt(
records: (
dict[str, dict[str, Any]]
| ValuesView[dict] # eg. `Position._events.values()`
| list[dict]
| list[Transaction] # XXX preferred!
),
@ -220,7 +227,7 @@ def iter_by_dt(
# so if you know that the record stats show some field
# is more common then others, stick it at the top B)
parsers: dict[str, Callable | None] = {
'dt': None, # parity case
'dt': parse, # parity case
'datetime': parse, # datetime-str
'time': from_timestamp, # float epoch
},
@ -259,8 +266,13 @@ def iter_by_dt(
# the `parsers` table above (when NOT using
# `.get()`), otherwise pass through the value and
# sort on it directly
parser: Callable | None = parsers[k]
return parser(v) if (parser is not None) else v
if (
not isinstance(v, DateTime)
and (parser := parsers.get(k))
):
return parser(v)
else:
return v
else:
# XXX: should never get here..
@ -271,6 +283,7 @@ def iter_by_dt(
records,
key=key or dyn_parse_to_dt,
):
# NOTE the type sig above; either pairs or txns B)
yield entry
@ -331,7 +344,14 @@ def open_ledger_dfs(
brokername: str,
acctname: str,
) -> dict[str, pl.DataFrame]:
ledger: TransactionLedger | None = None,
**kwargs,
) -> tuple[
dict[str, pl.DataFrame],
TransactionLedger,
]:
'''
Open a ledger of trade records (presumably from some broker
backend), normalize the records into `Transactions` via the
@ -341,86 +361,89 @@ def open_ledger_dfs(
'''
from ._ledger import (
open_trade_ledger,
# Transaction,
TransactionLedger,
)
ledger: TransactionLedger
import time
now = time.time()
with (
open_trade_ledger(
brokername,
acctname,
rewrite=True,
allow_from_sync_code=True,
) as ledger,
):
if not ledger:
raise ValueError(f'No ledger for {acctname}@{brokername} exists?')
if not ledger:
import time
from tractor._debug import open_crash_handler
now = time.time()
print(f'LEDGER LOAD TIME: {time.time() - now}')
# process raw TOML ledger into txns using the
# appropriate backend normalizer.
# cache: AssetsInfo = get_symcache(
# brokername,
# allow_reload=True,
# )
with (
open_crash_handler(),
txns: dict[str, Transaction]
if acctname != 'paper':
txns = ledger.mod.norm_trade_records(ledger)
else:
txns = ledger.to_txns()
open_trade_ledger(
brokername,
acctname,
rewrite=True,
allow_from_sync_code=True,
ldf = pl.DataFrame(
list(txn.to_dict() for txn in txns.values()),
# schema=[
# ('tid', str),
# ('fqme', str),
# ('dt', str),
# ('size', pl.Float64),
# ('price', pl.Float64),
# ('cost', pl.Float64),
# ('expiry', str),
# ('bs_mktid', str),
# ],
).sort('dt').select([
pl.col('fqme'),
pl.col('dt').str.to_datetime(),
# pl.col('expiry').dt.datetime(),
pl.col('bs_mktid'),
pl.col('size'),
pl.col('price'),
# proxied through from caller
**kwargs,
) as ledger,
):
if not ledger:
raise ValueError(f'No ledger for {acctname}@{brokername} exists?')
print(f'LEDGER LOAD TIME: {time.time() - now}')
# process raw TOML ledger into txns using the
# appropriate backend normalizer.
# cache: AssetsInfo = get_symcache(
# brokername,
# allow_reload=True,
# )
txns: dict[str, Transaction] = ledger.to_txns()
ldf = pl.DataFrame(
list(txn.to_dict() for txn in txns.values()),
# schema=[
# ('tid', str),
# ('fqme', str),
# ('dt', str),
# ('size', pl.Float64),
# ('price', pl.Float64),
# ('cost', pl.Float64),
# ('expiry', str),
# ('bs_mktid', str),
# ],
# ).sort('dt').select([
).sort('dt').with_columns([
# pl.col('fqme'),
pl.col('dt').str.to_datetime(),
# pl.col('expiry').dt.datetime(),
# pl.col('bs_mktid'),
# pl.col('size'),
# pl.col('price'),
])
# filter out to the columns matching values filter passed
# as input.
# if filter_by_ids:
# for col, vals in filter_by_ids.items():
# str_vals = set(map(str, vals))
# pred: pl.Expr = pl.col(col).eq(str_vals.pop())
# for val in str_vals:
# pred |= pl.col(col).eq(val)
# fdf = df.filter(pred)
# bs_mktid: str = fdf[0]['bs_mktid']
# pos: Position = acnt.pps[bs_mktid]
# TODO: not sure if this is even possible but..
# ppt = df.groupby('fqme').agg([
# # TODO: ppu and bep !!
# pl.cumsum('size').alias('cumsum'),
# ])
dfs: dict[str, pl.DataFrame] = ldf.partition_by(
'fqme',
as_dict=True,
)
for key in dfs:
df = dfs[key]
dfs[key] = df.with_columns([
pl.cumsum('size').alias('cumsize'),
])
# filter out to the columns matching values filter passed
# as input.
# if filter_by_ids:
# for col, vals in filter_by_ids.items():
# str_vals = set(map(str, vals))
# pred: pl.Expr = pl.col(col).eq(str_vals.pop())
# for val in str_vals:
# pred |= pl.col(col).eq(val)
# fdf = df.filter(pred)
# bs_mktid: str = fdf[0]['bs_mktid']
# pos: Position = acnt.pps[bs_mktid]
# ppt = df.groupby('fqme').agg([
# # TODO: ppu and bep !!
# pl.cumsum('size').alias('cumsum'),
# ])
dfs: dict[str, pl.DataFrame] = ldf.partition_by(
'fqme',
as_dict=True,
)
# for fqme, ppt in act.items():
# ppt.with_columns
# # TODO: ppu and bep !!
# pl.cumsum('size').alias('cumsum'),
# ])
yield dfs
yield dfs, ledger

View File

@ -37,8 +37,8 @@ from ..calc import humanize
from ..brokers._daemon import broker_init
from ._ledger import (
load_ledger,
TransactionLedger,
# open_trade_ledger,
# TransactionLedger,
)
from .calc import (
open_ledger_dfs,
@ -263,20 +263,17 @@ def disect(
# ledger dfs groupby-partitioned by fqme
dfs: dict[str, pl.DataFrame]
# actual ledger ref filled in with all txns
ldgr: TransactionLedger
with open_ledger_dfs(
brokername,
account,
) as dfs:
) as (dfs, ldgr):
for key in dfs:
df = dfs[key]
dfs[key] = df.with_columns([
pl.cumsum('size').alias('cumsum'),
])
ppt = dfs[fqme]
# look up specific frame for fqme-selected asset
df = dfs[fqme]
assert not df.is_empty()
assert not ppt.is_empty()
# TODO: we REALLY need a better console REPL for this
# kinda thing..