Rework `.accounting.Position` calcs to prep for `polars`

We're probably going to move to implementing all accounting using
`polars.DataFrame` and friends and thus this rejig preps for a much more
"stateless" implementation of our `Position` type and its internal
pos-accounting metrics: `ppu` and `cumsize`.

Summary:
- wrt to `._pos.Position`:
  - rename `.size`/`.accum_size` to `.cumsize` to be more in line
    with `polars.DataFrame.cumsum()`.
  - make `Position.expiry` delegate to the underlying `.mkt: MktPair`
    handling (hopefully) all edge cases..
  - change over to a new `._events: dict[str, Transaction]` in prep
    for #510 (and friends) and enforce a new `Transaction.etype: str`
    which is by default `clear`.
  - add `.iter_by_type()` which iterates, filters and sorts the
    entries in `._events` from above.
  - add `Position.clearsdict()` which returns the dict-ified and
    datetime-sorted table which can more-or-less be stored in the
    toml account file.
  - add `.minimized_clears()` a new (and close) version of the old
    method which always grabs at least one clear before
    a position-side-polarity-change.
  - mask-drop `.ensure_state()` since there is no more `.size`/`.price`
    state vars (per say) as we always re-calc the ppu and cumsize from
    the clears records on every read.
  - `.add_clear` no longer does bisec insorting since all sorting is
    done on position properties *reads*.
  - move the PPU (price per unit) calculator to a new `.accounting.calcs`
    as well as add in the `iter_by_dt()` clearing transaction sorted
    iterator.
    - also make some fixes to this to handle both lists of `Transaction`
      as well as `dict`s as before.

- start rename of `PpTable` -> `Account` and make a note about adding
  a `.balances` table.
- always `float()` the transaction size/price values since it seems if
  they get processed as `tomlkit.Integer` there's some suuper weird
  double negative on read-then-write to the clears table?
  - something like `cumsize = -1` -> `cumsize = --1` !?!?
- make `load_pps_from_ledger()` work again but now includes some very
  very first draft `polars` df processing from a transaction ledger.
  - use this from the `accounting.cli.disect` subcmd which is also in
    *super early draft* mode ;)
- obviously as mentioned in the `Position` section, add the new `.calcs`
  module with a `.ppu()` calculator func B)
account_tests
Tyler Goodlet 2023-07-03 18:52:02 -04:00
parent 745c144314
commit 05af2b3e64
6 changed files with 707 additions and 507 deletions

View File

@ -21,8 +21,10 @@ for tendiez.
''' '''
from ..log import get_logger from ..log import get_logger
from ._ledger import ( from .calc import (
iter_by_dt, iter_by_dt,
)
from ._ledger import (
Transaction, Transaction,
TransactionLedger, TransactionLedger,
open_trade_ledger, open_trade_ledger,
@ -100,20 +102,3 @@ def get_likely_pair(
likely_dst = bs_mktid[:src_name_start] likely_dst = bs_mktid[:src_name_start]
if likely_dst == dst: if likely_dst == dst:
return bs_mktid return bs_mktid
if __name__ == '__main__':
import sys
from pprint import pformat
args = sys.argv
assert len(args) > 1, 'Specifiy account(s) from `brokers.toml`'
args = args[1:]
for acctid in args:
broker, name = acctid.split('.')
trans, updated_pps = load_pps_from_ledger(broker, name)
print(
f'Processing transactions into pps for {broker}:{acctid}\n'
f'{pformat(trans)}\n\n'
f'{pformat(updated_pps)}'
)

View File

@ -118,9 +118,9 @@ class Allocator(Struct):
ld: int = mkt.size_tick_digits ld: int = mkt.size_tick_digits
size_unit = self.size_unit size_unit = self.size_unit
live_size = live_pp.size live_size = live_pp.cumsize
abs_live_size = abs(live_size) abs_live_size = abs(live_size)
abs_startup_size = abs(startup_pp.size) abs_startup_size = abs(startup_pp.cumsize)
u_per_slot, currency_per_slot = self.step_sizes() u_per_slot, currency_per_slot = self.step_sizes()
@ -213,8 +213,6 @@ class Allocator(Struct):
slots_used = self.slots_used( slots_used = self.slots_used(
Position( Position(
mkt=mkt, mkt=mkt,
size=order_size,
ppu=price,
bs_mktid=mkt.bs_mktid, bs_mktid=mkt.bs_mktid,
) )
) )
@ -241,7 +239,7 @@ class Allocator(Struct):
Calc and return the number of slots used by this ``Position``. Calc and return the number of slots used by this ``Position``.
''' '''
abs_pp_size = abs(pp.size) abs_pp_size = abs(pp.cumsize)
if self.size_unit == 'currency': if self.size_unit == 'currency':
# live_currency_size = size or (abs_pp_size * pp.ppu) # live_currency_size = size or (abs_pp_size * pp.ppu)

View File

@ -25,15 +25,12 @@ from pathlib import Path
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Iterator,
Union,
Generator Generator
) )
from pendulum import ( from pendulum import (
datetime, datetime,
DateTime, DateTime,
from_timestamp,
parse, parse,
) )
import tomli_w # for fast ledger writing import tomli_w # for fast ledger writing
@ -41,6 +38,9 @@ import tomli_w # for fast ledger writing
from .. import config from .. import config
from ..data.types import Struct from ..data.types import Struct
from ..log import get_logger from ..log import get_logger
from .calc import (
iter_by_dt,
)
from ._mktinfo import ( from ._mktinfo import (
Symbol, # legacy Symbol, # legacy
MktPair, MktPair,
@ -56,13 +56,14 @@ class Transaction(Struct, frozen=True):
# once we have that as a required field, # once we have that as a required field,
# we don't really need the fqme any more.. # we don't really need the fqme any more..
fqme: str fqme: str
tid: str | int # unique transaction id
tid: Union[str, int] # unique transaction id
size: float size: float
price: float price: float
cost: float # commisions or other additional costs cost: float # commisions or other additional costs
dt: datetime dt: datetime
etype: str = 'clear'
# TODO: we can drop this right since we # TODO: we can drop this right since we
# can instead expect the backend to provide this # can instead expect the backend to provide this
# via the `MktPair`? # via the `MktPair`?
@ -159,9 +160,9 @@ class TransactionLedger(UserDict):
# and instead call it for each entry incrementally: # and instead call it for each entry incrementally:
# normer = mod.norm_trade_record(txdict) # normer = mod.norm_trade_record(txdict)
# TODO: use tx_sort here yah? # datetime-sort and pack into txs
for txdict in self.tx_sort(self.data.values()): for txdict in self.tx_sort(self.data.values()):
# for tid, txdict in self.data.items():
# special field handling for datetimes # special field handling for datetimes
# to ensure pendulum is used! # to ensure pendulum is used!
tid: str = txdict['tid'] tid: str = txdict['tid']
@ -186,6 +187,7 @@ class TransactionLedger(UserDict):
# TODO: change to .sys! # TODO: change to .sys!
sym=mkt, sym=mkt,
expiry=parse(expiry) if expiry else None, expiry=parse(expiry) if expiry else None,
etype='clear',
) )
yield tid, tx yield tid, tx
@ -208,62 +210,26 @@ class TransactionLedger(UserDict):
Render the self.data ledger dict to it's TOML file form. Render the self.data ledger dict to it's TOML file form.
''' '''
cpy = self.data.copy()
towrite: dict[str, Any] = {} towrite: dict[str, Any] = {}
for tid, trans in cpy.items(): for tid, txdict in self.tx_sort(self.data.copy()):
# drop key for non-expiring assets # write blank-str expiry for non-expiring assets
txdict = towrite[tid] = self.data[tid]
if ( if (
'expiry' in txdict 'expiry' in txdict
and txdict['expiry'] is None and txdict['expiry'] is None
): ):
txdict.pop('expiry') txdict['expiry'] = ''
# re-write old acro-key # re-write old acro-key
fqme = txdict.get('fqsn') if fqme := txdict.get('fqsn'):
if fqme:
txdict['fqme'] = fqme txdict['fqme'] = fqme
towrite[tid] = txdict
with self.file_path.open(mode='wb') as fp: with self.file_path.open(mode='wb') as fp:
tomli_w.dump(towrite, fp) tomli_w.dump(towrite, fp)
def iter_by_dt(
records: dict[str, dict[str, Any]] | list[dict],
# NOTE: parsers are looked up in the insert order
# so if you know that the record stats show some field
# is more common then others, stick it at the top B)
parsers: dict[tuple[str], Callable] = {
'dt': None, # parity case
'datetime': parse, # datetime-str
'time': from_timestamp, # float epoch
},
key: Callable | None = None,
) -> Iterator[tuple[str, dict]]:
'''
Iterate entries of a ``records: dict`` table sorted by entry recorded
datetime presumably set at the ``'dt'`` field in each entry.
'''
def dyn_parse_to_dt(txdict: dict[str, Any]) -> DateTime:
k, v, parser = next(
(k, txdict[k], parsers[k]) for k in parsers if k in txdict
)
return parser(v) if parser else v
if isinstance(records, dict):
records = records.values()
for entry in sorted(
records,
key=key or dyn_parse_to_dt,
):
yield entry
def load_ledger( def load_ledger(
brokername: str, brokername: str,
acctid: str, acctid: str,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,276 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Calculation routines for balance and position tracking such that
you know when you're losing money (if possible) XD
'''
from __future__ import annotations
from math import copysign
from typing import (
Any,
Callable,
Iterator,
TYPE_CHECKING,
)
from pendulum import (
# datetime,
DateTime,
from_timestamp,
parse,
)
if TYPE_CHECKING:
from ._ledger import (
Transaction,
)
def ppu(
clears: Iterator[Transaction],
# include transaction cost in breakeven price
# and presume the worst case of the same cost
# to exit this transaction (even though in reality
# it will be dynamic based on exit stratetgy).
cost_scalar: float = 2,
# return the ledger of clears as a (now dt sorted) dict with
# new position fields inserted alongside each entry.
as_ledger: bool = False,
) -> float:
'''
Compute the "price-per-unit" price for the given non-zero sized
rolling position.
The recurrence relation which computes this (exponential) mean
per new clear which **increases** the accumulative postiion size
is:
ppu[-1] = (
ppu[-2] * accum_size[-2]
+
ppu[-1] * size
) / accum_size[-1]
where `cost_basis` for the current step is simply the price
* size of the most recent clearing transaction.
'''
asize_h: list[float] = [] # historical accumulative size
ppu_h: list[float] = [] # historical price-per-unit
ledger: dict[str, dict] = {}
# entry: dict[str, Any] | Transaction
t: Transaction
for t in clears:
# tid: str = entry['tid']
# clear_size = entry['size']
clear_size: float = t.size
# clear_price: str | float = entry['price']
clear_price: str | float = t.price
is_clear: bool = not isinstance(clear_price, str)
last_accum_size = asize_h[-1] if asize_h else 0
accum_size = last_accum_size + clear_size
accum_sign = copysign(1, accum_size)
sign_change: bool = False
if accum_size == 0:
ppu_h.append(0)
asize_h.append(0)
continue
# on transfers we normally write some non-valid
# price since withdrawal to another account/wallet
# has nothing to do with inter-asset-market prices.
# TODO: this should be better handled via a `type: 'tx'`
# field as per existing issue surrounding all this:
# https://github.com/pikers/piker/issues/510
if isinstance(clear_price, str):
# TODO: we can't necessarily have this commit to
# the overall pos size since we also need to
# include other positions contributions to this
# balance or we might end up with a -ve balance for
# the position..
continue
# test if the pp somehow went "passed" a net zero size state
# resulting in a change of the "sign" of the size (+ve for
# long, -ve for short).
sign_change = (
copysign(1, last_accum_size) + accum_sign == 0
and last_accum_size != 0
)
# since we passed the net-zero-size state the new size
# after sum should be the remaining size the new
# "direction" (aka, long vs. short) for this clear.
if sign_change:
clear_size = accum_size
abs_diff = abs(accum_size)
asize_h.append(0)
ppu_h.append(0)
else:
# old size minus the new size gives us size diff with
# +ve -> increase in pp size
# -ve -> decrease in pp size
abs_diff = abs(accum_size) - abs(last_accum_size)
# XXX: LIFO breakeven price update. only an increaze in size
# of the position contributes the breakeven price,
# a decrease does not (i.e. the position is being made
# smaller).
# abs_clear_size = abs(clear_size)
abs_new_size: float | int = abs(accum_size)
if (
abs_diff > 0
and is_clear
):
cost_basis = (
# cost basis for this clear
clear_price * abs(clear_size)
+
# transaction cost
# accum_sign * cost_scalar * entry['cost']
accum_sign * cost_scalar * t.cost
)
if asize_h:
size_last = abs(asize_h[-1])
cb_last = ppu_h[-1] * size_last
ppu = (cost_basis + cb_last) / abs_new_size
else:
ppu = cost_basis / abs_new_size
# ppu_h.append(ppu)
# asize_h.append(accum_size)
else:
# TODO: for PPU we should probably handle txs out
# (aka withdrawals) similarly by simply not having
# them contrib to the running PPU calc and only
# when the next entry clear comes in (which will
# then have a higher weighting on the PPU).
# on "exit" clears from a given direction,
# only the size changes not the price-per-unit
# need to be updated since the ppu remains constant
# and gets weighted by the new size.
ppu: float = ppu_h[-1] # set to previous value
# ppu_h.append(ppu_h[-1])
# asize_h.append(accum_size)
# extend with new rolling metric for this step
ppu_h.append(ppu)
asize_h.append(accum_size)
# ledger[t.tid] = {
# 'tx': t,
ledger[t.tid] = t.to_dict() | {
'ppu': ppu,
'cumsize': accum_size,
'sign_change': sign_change,
# TODO: cumpnl, bep
}
final_ppu = ppu_h[-1] if ppu_h else 0
# TODO: once we have etypes in all ledger entries..
# handle any split info entered (for now) manually by user
# if self.split_ratio is not None:
# final_ppu /= self.split_ratio
if as_ledger:
return ledger
else:
return final_ppu
def iter_by_dt(
records: (
dict[str, dict[str, Any]]
| list[dict]
| list[Transaction] # XXX preferred!
),
# NOTE: parsers are looked up in the insert order
# so if you know that the record stats show some field
# is more common then others, stick it at the top B)
parsers: dict[tuple[str], Callable] = {
'dt': None, # parity case
'datetime': parse, # datetime-str
'time': from_timestamp, # float epoch
},
key: Callable | None = None,
) -> Iterator[tuple[str, dict]]:
'''
Iterate entries of a transaction table sorted by entry recorded
datetime presumably set at the ``'dt'`` field in each entry.
'''
# isdict: bool = False
if isinstance(records, dict):
# isdict: bool = True
records = list(records.items())
def dyn_parse_to_dt(
tx: tuple[str, dict[str, Any]] | Transaction,
) -> DateTime:
# handle `.items()` inputs
if isinstance(tx, tuple):
tx = tx[1]
# dict or tx object?
isdict: bool = isinstance(tx, dict)
# get best parser for this record..
for k in parsers:
if (
isdict and k in tx
or getattr(tx, k, None)
):
v = tx[k] if isdict else tx.dt
if v is None:
breakpoint()
parser = parsers[k]
# only call parser on the value if not None from the
# `parsers` table above, otherwise pass through the value
# and sort on it directly
return parser(v) if (parser is not None) else v
else:
breakpoint()
entry: tuple[str, dict] | Transaction
for entry in sorted(
records,
key=key or dyn_parse_to_dt,
):
yield entry

View File

@ -240,9 +240,13 @@ def sync(
def disect( def disect(
# "fully_qualified_account_name" # "fully_qualified_account_name"
fqan: str, fqan: str,
bs_mktid: str, # for ib fqme: str, # for ib
pdb: bool = False, pdb: bool = False,
bs_mktid: str = typer.Option(
None,
"-bid",
),
loglevel: str = typer.Option( loglevel: str = typer.Option(
'error', 'error',
"-l", "-l",
@ -255,36 +259,24 @@ def disect(
brokername, account = pair brokername, account = pair
# ledger: TransactionLedger # ledger: TransactionLedger
records: dict[str, dict] # records: dict[str, dict]
table: PpTable table: PpTable
records, table = load_pps_from_ledger( df: pl.DataFrame # legder df
ppt: pl.DataFrame # piker position table
df, ppt, table = load_pps_from_ledger(
brokername, brokername,
account, account,
filter_by_ids={bs_mktid}, filter_by_ids={'fqme': [fqme]},
) )
df = pl.DataFrame( # sers = [
list(records.values()), # pl.Series(e['fqme'], e['cumsum'])
# schema=[ # for e in ppt.to_dicts()
# ('tid', str), # ]
# ('fqme', str), # ppt_by_id: pl.DataFrame = ppt.filter(
# ('dt', str), # pl.col('fqme') == fqme,
# ('size', pl.Float64), # )
# ('price', pl.Float64),
# ('cost', pl.Float64),
# ('expiry', str),
# ('bs_mktid', str),
# ],
).select([
pl.col('fqme'),
pl.col('dt').str.to_datetime(),
# pl.col('expiry').dt.datetime(),
pl.col('size'),
pl.col('price'),
])
assert not df.is_empty() assert not df.is_empty()
breakpoint() breakpoint()
# tractor.pause_from_sync()
# with open_trade_ledger( # with open_trade_ledger(
# brokername, # brokername,
# account, # account,