From 05af2b3e643a5ae3663f3d85d61819e3bf4a0258 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Jul 2023 18:52:02 -0400 Subject: [PATCH] Rework `.accounting.Position` calcs to prep for `polars` We're probably going to move to implementing all accounting using `polars.DataFrame` and friends and thus this rejig preps for a much more "stateless" implementation of our `Position` type and its internal pos-accounting metrics: `ppu` and `cumsize`. Summary: - wrt to `._pos.Position`: - rename `.size`/`.accum_size` to `.cumsize` to be more in line with `polars.DataFrame.cumsum()`. - make `Position.expiry` delegate to the underlying `.mkt: MktPair` handling (hopefully) all edge cases.. - change over to a new `._events: dict[str, Transaction]` in prep for #510 (and friends) and enforce a new `Transaction.etype: str` which is by default `clear`. - add `.iter_by_type()` which iterates, filters and sorts the entries in `._events` from above. - add `Position.clearsdict()` which returns the dict-ified and datetime-sorted table which can more-or-less be stored in the toml account file. - add `.minimized_clears()` a new (and close) version of the old method which always grabs at least one clear before a position-side-polarity-change. - mask-drop `.ensure_state()` since there is no more `.size`/`.price` state vars (per say) as we always re-calc the ppu and cumsize from the clears records on every read. - `.add_clear` no longer does bisec insorting since all sorting is done on position properties *reads*. - move the PPU (price per unit) calculator to a new `.accounting.calcs` as well as add in the `iter_by_dt()` clearing transaction sorted iterator. - also make some fixes to this to handle both lists of `Transaction` as well as `dict`s as before. - start rename of `PpTable` -> `Account` and make a note about adding a `.balances` table. - always `float()` the transaction size/price values since it seems if they get processed as `tomlkit.Integer` there's some suuper weird double negative on read-then-write to the clears table? - something like `cumsize = -1` -> `cumsize = --1` !?!? - make `load_pps_from_ledger()` work again but now includes some very very first draft `polars` df processing from a transaction ledger. - use this from the `accounting.cli.disect` subcmd which is also in *super early draft* mode ;) - obviously as mentioned in the `Position` section, add the new `.calcs` module with a `.ppu()` calculator func B) --- piker/accounting/__init__.py | 21 +- piker/accounting/_allocate.py | 8 +- piker/accounting/_ledger.py | 64 +-- piker/accounting/_pos.py | 803 +++++++++++++++++----------------- piker/accounting/calc.py | 276 ++++++++++++ piker/accounting/cli.py | 42 +- 6 files changed, 707 insertions(+), 507 deletions(-) create mode 100644 piker/accounting/calc.py diff --git a/piker/accounting/__init__.py b/piker/accounting/__init__.py index 707c6600..3246d9c8 100644 --- a/piker/accounting/__init__.py +++ b/piker/accounting/__init__.py @@ -21,8 +21,10 @@ for tendiez. ''' from ..log import get_logger -from ._ledger import ( +from .calc import ( iter_by_dt, +) +from ._ledger import ( Transaction, TransactionLedger, open_trade_ledger, @@ -100,20 +102,3 @@ def get_likely_pair( likely_dst = bs_mktid[:src_name_start] if likely_dst == dst: return bs_mktid - - -if __name__ == '__main__': - import sys - from pprint import pformat - - args = sys.argv - assert len(args) > 1, 'Specifiy account(s) from `brokers.toml`' - args = args[1:] - for acctid in args: - broker, name = acctid.split('.') - trans, updated_pps = load_pps_from_ledger(broker, name) - print( - f'Processing transactions into pps for {broker}:{acctid}\n' - f'{pformat(trans)}\n\n' - f'{pformat(updated_pps)}' - ) diff --git a/piker/accounting/_allocate.py b/piker/accounting/_allocate.py index b4345785..deeec498 100644 --- a/piker/accounting/_allocate.py +++ b/piker/accounting/_allocate.py @@ -118,9 +118,9 @@ class Allocator(Struct): ld: int = mkt.size_tick_digits size_unit = self.size_unit - live_size = live_pp.size + live_size = live_pp.cumsize abs_live_size = abs(live_size) - abs_startup_size = abs(startup_pp.size) + abs_startup_size = abs(startup_pp.cumsize) u_per_slot, currency_per_slot = self.step_sizes() @@ -213,8 +213,6 @@ class Allocator(Struct): slots_used = self.slots_used( Position( mkt=mkt, - size=order_size, - ppu=price, bs_mktid=mkt.bs_mktid, ) ) @@ -241,7 +239,7 @@ class Allocator(Struct): Calc and return the number of slots used by this ``Position``. ''' - abs_pp_size = abs(pp.size) + abs_pp_size = abs(pp.cumsize) if self.size_unit == 'currency': # live_currency_size = size or (abs_pp_size * pp.ppu) diff --git a/piker/accounting/_ledger.py b/piker/accounting/_ledger.py index 268a81fc..b0061f0a 100644 --- a/piker/accounting/_ledger.py +++ b/piker/accounting/_ledger.py @@ -25,15 +25,12 @@ from pathlib import Path from typing import ( Any, Callable, - Iterator, - Union, Generator ) from pendulum import ( datetime, DateTime, - from_timestamp, parse, ) import tomli_w # for fast ledger writing @@ -41,6 +38,9 @@ import tomli_w # for fast ledger writing from .. import config from ..data.types import Struct from ..log import get_logger +from .calc import ( + iter_by_dt, +) from ._mktinfo import ( Symbol, # legacy MktPair, @@ -56,13 +56,14 @@ class Transaction(Struct, frozen=True): # once we have that as a required field, # we don't really need the fqme any more.. fqme: str - - tid: Union[str, int] # unique transaction id + tid: str | int # unique transaction id size: float price: float cost: float # commisions or other additional costs dt: datetime + etype: str = 'clear' + # TODO: we can drop this right since we # can instead expect the backend to provide this # via the `MktPair`? @@ -159,9 +160,9 @@ class TransactionLedger(UserDict): # and instead call it for each entry incrementally: # normer = mod.norm_trade_record(txdict) - # TODO: use tx_sort here yah? + # datetime-sort and pack into txs for txdict in self.tx_sort(self.data.values()): - # for tid, txdict in self.data.items(): + # special field handling for datetimes # to ensure pendulum is used! tid: str = txdict['tid'] @@ -186,6 +187,7 @@ class TransactionLedger(UserDict): # TODO: change to .sys! sym=mkt, expiry=parse(expiry) if expiry else None, + etype='clear', ) yield tid, tx @@ -208,62 +210,26 @@ class TransactionLedger(UserDict): Render the self.data ledger dict to it's TOML file form. ''' - cpy = self.data.copy() towrite: dict[str, Any] = {} - for tid, trans in cpy.items(): + for tid, txdict in self.tx_sort(self.data.copy()): - # drop key for non-expiring assets - txdict = towrite[tid] = self.data[tid] + # write blank-str expiry for non-expiring assets if ( 'expiry' in txdict and txdict['expiry'] is None ): - txdict.pop('expiry') + txdict['expiry'] = '' # re-write old acro-key - fqme = txdict.get('fqsn') - if fqme: + if fqme := txdict.get('fqsn'): txdict['fqme'] = fqme + towrite[tid] = txdict + with self.file_path.open(mode='wb') as fp: tomli_w.dump(towrite, fp) -def iter_by_dt( - records: dict[str, dict[str, Any]] | list[dict], - - # NOTE: parsers are looked up in the insert order - # so if you know that the record stats show some field - # is more common then others, stick it at the top B) - parsers: dict[tuple[str], Callable] = { - 'dt': None, # parity case - 'datetime': parse, # datetime-str - 'time': from_timestamp, # float epoch - }, - key: Callable | None = None, - -) -> Iterator[tuple[str, dict]]: - ''' - Iterate entries of a ``records: dict`` table sorted by entry recorded - datetime presumably set at the ``'dt'`` field in each entry. - - ''' - def dyn_parse_to_dt(txdict: dict[str, Any]) -> DateTime: - k, v, parser = next( - (k, txdict[k], parsers[k]) for k in parsers if k in txdict - ) - return parser(v) if parser else v - - if isinstance(records, dict): - records = records.values() - - for entry in sorted( - records, - key=key or dyn_parse_to_dt, - ): - yield entry - - def load_ledger( brokername: str, acctid: str, diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 317a6d6a..65eb67a8 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -22,10 +22,9 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. ''' from __future__ import annotations -from bisect import insort +# from bisect import insort from contextlib import contextmanager as cm from decimal import Decimal -from math import copysign from pprint import pformat from pathlib import Path from typing import ( @@ -34,13 +33,16 @@ from typing import ( Generator ) +import polars as pl import pendulum -from pendulum import datetime, now +from pendulum import ( + datetime, + now, +) import tomlkit from ._ledger import ( Transaction, - iter_by_dt, open_trade_ledger, TransactionLedger, ) @@ -49,6 +51,10 @@ from ._mktinfo import ( Asset, unpack_fqme, ) +from .calc import ( + ppu, + iter_by_dt, +) from .. import config from ..clearing._messages import ( BrokerdPosition, @@ -81,7 +87,7 @@ class Position(Struct): mkt: MktPair # can be +ve or -ve for long/short - size: float + # size: float # "price-per-unit price" above or below which pnl moves above and # below zero for the entirety of the current "trade state". The ppu @@ -89,7 +95,7 @@ class Position(Struct): # in one of a long/short "direction" (i.e. abs(.size_i) > 0 after # the next transaction given .size was > 0 before that tx, and vice # versa for -ve sized positions). - ppu: float + # ppu: float # TODO: break-even-price support! # bep: float @@ -103,75 +109,157 @@ class Position(Struct): _clears: list[ dict[str, Any], # transaction history summaries ] = [] - _events: dict[str, dict] = {} - first_clear_dt: datetime | None = None + + # _events: pl.DataFrame | None = None + _events: dict[str, Transaction | dict] = {} + + # first_clear_dt: datetime | None = None @property def expiry(self) -> datetime | None: - if exp := self.mkt.expiry: - return pendulum.parse(exp) + exp: str = self.mkt.expiry + match exp: + # empty str, 'perp' (contract) or simply a null + # signifies instrument with NO expiry. + case 'perp' | '' | None: + return None - def __repr__(self) -> str: - return pformat(self.to_dict()) + case str(): + return pendulum.parse(exp) + + case _: + raise ValueError( + f'Unhandled `MktPair.expiry`: `{exp}`' + ) + + # TODO: idea: "real LIFO" dynamic positioning. + # - when a trade takes place where the pnl for + # the (set of) trade(s) is below the breakeven price + # it may be that the trader took a +ve pnl on a short(er) + # term trade in the same account. + # - in this case we could recalc the be price to + # be reverted back to it's prior value before the nearest term + # trade was opened.? + # def bep() -> float: + # ... + + def clearsdict(self) -> dict[str, dict]: + clears: dict[str, dict] = ppu( + self.iter_by_type('clear'), + as_ledger=True + ) + return clears + + def iter_by_type( + self, + etype: str, + ) -> Iterator[dict | Transaction]: + ''' + Iterate the internally managed ``._events: dict`` table in + datetime-stamped order. + + ''' + # sort on the expected datetime field + for event in iter_by_dt( + self._events.values(), + key=lambda entry: + getattr(entry, 'dt', None) + or entry.get('dt'), + ): + match event: + case ( + { 'etype': _etype} | + Transaction(etype=str(_etype)) + ): + assert _etype == etype + yield event + + + def minimized_clears(self) -> dict[str, dict]: + ''' + Minimize the position's clears entries by removing + all transactions before the last net zero size except for when + a clear event causes a position "side" change (i.e. long to short + after a single fill) wherein we store the transaction prior to the + net-zero pass. + + This avoids unnecessary history irrelevant to the current + non-net-zero size state when serializing for offline storage. + + ''' + # scan for the last "net zero" position by iterating + # transactions until the next net-zero accum_size, rinse, + # repeat. + cumsize: float = 0 + clears_since_zero: list[dict] = [] + + for tid, cleardict in self.clearsdict().items(): + cumsize = float( + # self.mkt.quantize(cumsize + cleardict['tx'].size + self.mkt.quantize(cleardict['cumsize']) + ) + clears_since_zero.append(cleardict) + + # NOTE: always pop sign change since we just use it to + # determine which entry to clear "up to". + sign_change: bool = cleardict.pop('sign_change') + if cumsize == 0: + clears_since_zero = clears_since_zero[:-2] + # clears_since_zero.clear() + + elif sign_change: + clears_since_zero = clears_since_zero[:-1] + + return clears_since_zero def to_pretoml(self) -> tuple[str, dict]: ''' Prep this position's data contents for export as an entry in a TOML "account file" (such as `account.binance.paper.toml`) including re-structuring of - the ``._clears`` entries as an array of inline-subtables + the ``._events`` entries as an array of inline-subtables for better ``pps.toml`` compactness. ''' - asdict = self.to_dict() - clears: list[dict] = asdict.pop('_clears') - events: dict[str, Transaction] = asdict.pop('_events') - - if self.split_ratio is None: - asdict.pop('split_ratio') - - # should be obvious from clears/event table - asdict.pop('first_clear_dt') - + mkt: MktPair = self.mkt + assert isinstance(mkt, MktPair) # TODO: we need to figure out how to have one top level # listing venue here even when the backend isn't providing # it via the trades ledger.. # drop symbol obj in serialized form - mkt: MktPair = asdict.pop('mkt') - assert isinstance(mkt, MktPair) - - fqme = mkt.fqme + fqme: str = mkt.fqme broker, mktep, venue, suffix = unpack_fqme(fqme) # an asset resolved mkt where we have ``Asset`` info about # each tradeable asset in the market. + asset_type: str = 'n/a' if mkt.resolved: dst: Asset = mkt.dst - asdict['asset_type'] = dst.atype + asset_type = dst.atype - asdict['price_tick'] = mkt.price_tick - asdict['size_tick'] = mkt.size_tick + asdict: dict[str, Any] = { + 'bs_mktid': self.bs_mktid, + 'expiry': self.expiry or '', + 'asset_type': asset_type, + 'price_tick': mkt.price_tick, + 'size_tick': mkt.size_tick, + } if exp := self.expiry: - asdict['expiry'] = exp.isoformat('T') + asdict['expiry'] = exp + clears_since_zero: list[dict] = self.minimized_clears() clears_table: tomlkit.Array = tomlkit.array() clears_table.multiline( multiline=True, indent='', ) - # reverse sort so latest clears are at top of section? - for entry in iter_by_dt(clears): - + for entry in clears_since_zero: inline_table = tomlkit.inline_table() - # serialize datetime to parsable `str` - inline_table['dt'] = entry['dt'].isoformat('T') - # assert 'Datetime' not in inline_table['dt'] - # insert optional clear fields in column order - for k in ['ppu', 'accum_size']: + for k in ['ppu', 'cumsize']: if val := entry.get(k): inline_table[k] = val @@ -179,61 +267,68 @@ class Position(Struct): for k in ['price', 'size', 'cost']: inline_table[k] = entry[k] + # serialize datetime to parsable `str` + inline_table['dt'] = entry['dt']#.isoformat('T') + # assert 'Datetime' not in inline_table['dt'] + tid: str = entry['tid'] - events.pop(tid) inline_table['tid'] = tid clears_table.append(inline_table) + # if val < 0: + # breakpoint() - assert not events + # assert not events asdict['clears'] = clears_table + return fqme, asdict - def ensure_state(self) -> None: - ''' - Audit either the `.size` and `.ppu` local instance vars against - the clears table calculations and return the calc-ed values if - they differ and log warnings to console. + # def ensure_state(self) -> None: + # ''' + # Audit either the `.cumsize` and `.ppu` local instance vars against + # the clears table calculations and return the calc-ed values if + # they differ and log warnings to console. - ''' - clears: list[dict] = self._clears - self.first_clear_dt = min(clears, key=lambda e: e['dt'])['dt'] - last_clear: dict = clears[-1] - csize: float = self.calc_size() - accum: float = last_clear['accum_size'] + # ''' + # # clears: list[dict] = self._clears - if not self.expired(): - if ( - csize != accum - and csize != round(accum * (self.split_ratio or 1)) - ): - raise ValueError(f'Size mismatch: {csize}') - else: - assert csize == 0, 'Contract is expired but non-zero size?' + # # self.first_clear_dt = min(clears, key=lambda e: e['dt'])['dt'] + # last_clear: dict = clears[-1] + # csize: float = self.calc_size() + # accum: float = last_clear['accum_size'] - if self.size != csize: - log.warning( - 'Position state mismatch:\n' - f'{self.size} => {csize}' - ) - self.size = csize + # if not self.expired(): + # if ( + # csize != accum + # and csize != round(accum * (self.split_ratio or 1)) + # ): + # raise ValueError(f'Size mismatch: {csize}') + # else: + # assert csize == 0, 'Contract is expired but non-zero size?' - cppu: float = self.calc_ppu() - ppu: float = last_clear['ppu'] - if ( - cppu != ppu - and self.split_ratio is not None + # if self.cumsize != csize: + # log.warning( + # 'Position state mismatch:\n' + # f'{self.cumsize} => {csize}' + # ) + # self.cumsize = csize - # handle any split info entered (for now) manually by user - and cppu != (ppu / self.split_ratio) - ): - raise ValueError(f'PPU mismatch: {cppu}') + # cppu: float = self.calc_ppu() + # ppu: float = last_clear['ppu'] + # if ( + # cppu != ppu + # and self.split_ratio is not None - if self.ppu != cppu: - log.warning( - 'Position state mismatch:\n' - f'{self.ppu} => {cppu}' - ) - self.ppu = cppu + # # handle any split info entered (for now) manually by user + # and cppu != (ppu / self.split_ratio) + # ): + # raise ValueError(f'PPU mismatch: {cppu}') + + # if self.ppu != cppu: + # log.warning( + # 'Position state mismatch:\n' + # f'{self.ppu} => {cppu}' + # ) + # self.ppu = cppu def update_from_msg( self, @@ -241,20 +336,23 @@ class Position(Struct): ) -> None: - # XXX: better place to do this? - mkt = self.mkt - size_tick_digits = mkt.size_tick_digits - price_tick_digits = mkt.price_tick_digits + mkt: MktPair = self.mkt + # we summarize the pos with a single summary transaction + # (for now) until we either pass THIS type as msg directly + # from emsd or come up with a better way? + t = Transaction( + fqme=mkt.bs_mktid, + sym=mkt, + bs_mktid=mkt.bs_mktid, + tid='unknown', + size=msg['size'], + price=msg['avg_price'], + cost=0, - self.ppu = round( - # TODO: change this to ppu? - msg['avg_price'], - ndigits=price_tick_digits, - ) - self.size = round( - msg['size'], - ndigits=size_tick_digits, + # TODO: also figure out how to avoid this! + dt=now(), ) + self.add_clear(t) @property def dsize(self) -> float: @@ -265,168 +363,6 @@ class Position(Struct): ''' return self.ppu * self.size - # TODO: idea: "real LIFO" dynamic positioning. - # - when a trade takes place where the pnl for - # the (set of) trade(s) is below the breakeven price - # it may be that the trader took a +ve pnl on a short(er) - # term trade in the same account. - # - in this case we could recalc the be price to - # be reverted back to it's prior value before the nearest term - # trade was opened.? - # def lifo_price() -> float: - # ... - - def iter_clears(self) -> Iterator[tuple[str, dict]]: - ''' - Iterate the internally managed ``._clears: dict`` table in - datetime-stamped order. - - ''' - # sort on the already existing datetime that should have - # been generated for the entry's table - return iter_by_dt( - self._clears, - key=lambda entry: entry['dt'] - ) - - def calc_ppu( - self, - - # include transaction cost in breakeven price - # and presume the worst case of the same cost - # to exit this transaction (even though in reality - # it will be dynamic based on exit stratetgy). - cost_scalar: float = 2, - - ) -> float: - ''' - Compute the "price-per-unit" price for the given non-zero sized - rolling position. - - The recurrence relation which computes this (exponential) mean - per new clear which **increases** the accumulative postiion size - is: - - ppu[-1] = ( - ppu[-2] * accum_size[-2] - + - ppu[-1] * size - ) / accum_size[-1] - - where `cost_basis` for the current step is simply the price - * size of the most recent clearing transaction. - - ''' - asize_h: list[float] = [] # historical accumulative size - ppu_h: list[float] = [] # historical price-per-unit - - entry: dict[str, Any] - for entry in self.iter_clears(): - clear_size = entry['size'] - clear_price: str | float = entry['price'] - is_clear: bool = not isinstance(clear_price, str) - - last_accum_size = asize_h[-1] if asize_h else 0 - accum_size = last_accum_size + clear_size - accum_sign = copysign(1, accum_size) - - sign_change: bool = False - - if accum_size == 0: - ppu_h.append(0) - asize_h.append(0) - continue - - # on transfers we normally write some non-valid - # price since withdrawal to another account/wallet - # has nothing to do with inter-asset-market prices. - # TODO: this should be better handled via a `type: 'tx'` - # field as per existing issue surrounding all this: - # https://github.com/pikers/piker/issues/510 - if isinstance(clear_price, str): - # TODO: we can't necessarily have this commit to - # the overall pos size since we also need to - # include other positions contributions to this - # balance or we might end up with a -ve balance for - # the position.. - continue - - # test if the pp somehow went "passed" a net zero size state - # resulting in a change of the "sign" of the size (+ve for - # long, -ve for short). - sign_change = ( - copysign(1, last_accum_size) + accum_sign == 0 - and last_accum_size != 0 - ) - - # since we passed the net-zero-size state the new size - # after sum should be the remaining size the new - # "direction" (aka, long vs. short) for this clear. - if sign_change: - clear_size = accum_size - abs_diff = abs(accum_size) - asize_h.append(0) - ppu_h.append(0) - - else: - # old size minus the new size gives us size diff with - # +ve -> increase in pp size - # -ve -> decrease in pp size - abs_diff = abs(accum_size) - abs(last_accum_size) - - # XXX: LIFO breakeven price update. only an increaze in size - # of the position contributes the breakeven price, - # a decrease does not (i.e. the position is being made - # smaller). - # abs_clear_size = abs(clear_size) - abs_new_size = abs(accum_size) - - if ( - abs_diff > 0 - and is_clear - ): - - cost_basis = ( - # cost basis for this clear - clear_price * abs(clear_size) - + - # transaction cost - accum_sign * cost_scalar * entry['cost'] - ) - - if asize_h: - size_last = abs(asize_h[-1]) - cb_last = ppu_h[-1] * size_last - ppu = (cost_basis + cb_last) / abs_new_size - - else: - ppu = cost_basis / abs_new_size - - ppu_h.append(ppu) - asize_h.append(accum_size) - - else: - # TODO: for PPU we should probably handle txs out - # (aka withdrawals) similarly by simply not having - # them contrib to the running PPU calc and only - # when the next entry clear comes in (which will - # then have a higher weighting on the PPU). - - # on "exit" clears from a given direction, - # only the size changes not the price-per-unit - # need to be updated since the ppu remains constant - # and gets weighted by the new size. - asize_h.append(accum_size) - ppu_h.append(ppu_h[-1]) - - final_ppu = ppu_h[-1] if ppu_h else 0 - - # handle any split info entered (for now) manually by user - if self.split_ratio is not None: - final_ppu /= self.split_ratio - - return final_ppu - def expired(self) -> bool: ''' Predicate which checks if the contract/instrument is past its expiry. @@ -434,70 +370,10 @@ class Position(Struct): ''' return bool(self.expiry) and self.expiry < now() - def calc_size(self) -> float: - ''' - Calculate the unit size of this position in the destination - asset using the clears/trade event table; zero if expired. - - ''' - size: float = 0. - - # time-expired pps (normally derivatives) are "closed" - # and have a zero size. - if self.expired(): - return 0. - - for entry in self._clears: - size += entry['size'] - # XXX: do we need it every step? - # no right since rounding is an LT? - # size = self.mkt.quantize( - # size + entry['size'], - # quantity_type='size', - # ) - - if self.split_ratio is not None: - size = round(size * self.split_ratio) - - return float( - self.mkt.quantize(size), - ) - - def minimize_clears( - self, - - ) -> dict[str, dict]: - ''' - Minimize the position's clears entries by removing - all transactions before the last net zero size to avoid - unnecessary history irrelevant to the current pp state. - - ''' - size: float = 0 - clears_since_zero: list[dict] = [] - - # TODO: we might just want to always do this when iterating - # a ledger? keep a state of the last net-zero and only do the - # full iterate when no state was stashed? - - # scan for the last "net zero" position by iterating - # transactions until the next net-zero size, rinse, repeat. - for clear in self._clears: - size = float( - self.mkt.quantize(size + clear['size']) - ) - clears_since_zero.append(clear) - - if size == 0: - clears_since_zero.clear() - - self._clears = clears_since_zero - return self._clears - def add_clear( self, t: Transaction, - ) -> dict: + ) -> bool: ''' Update clearing table by calculating the rolling ppu and (accumulative) size in both the clears entry and local @@ -506,25 +382,26 @@ class Position(Struct): Inserts are always done in datetime sorted order. ''' + added: bool = False tid: str = t.tid if tid in self._events: log.warning(f'{t} is already added?!') - return {} + return added - clear: dict[str, float | str | int] = { - 'tid': t.tid, - 'cost': t.cost, - 'price': t.price, - 'size': t.size, - 'dt': t.dt - } + # clear: dict[str, float | str | int] = { + # 'tid': t.tid, + # 'cost': t.cost, + # 'price': t.price, + # 'size': t.size, + # 'dt': t.dt + # } self._events[tid] = t - - insort( - self._clears, - clear, - key=lambda entry: entry['dt'] - ) + return True + # insort( + # self._clears, + # clear, + # key=lambda entry: entry['dt'] + # ) # TODO: compute these incrementally instead # of re-looping through each time resulting in O(n**2) @@ -533,12 +410,75 @@ class Position(Struct): # NOTE: we compute these **after** adding the entry in order to # make the recurrence relation math work inside # ``.calc_size()``. - self.size = clear['accum_size'] = self.calc_size() - self.ppu = clear['ppu'] = self.calc_ppu() + # self.size = clear['accum_size'] = self.calc_size() + # self.ppu = clear['ppu'] = self.calc_ppu() + # self.size: float = self.calc_size() + # self.ppu: float = self.calc_ppu() - assert len(self._events) == len(self._clears) + # assert len(self._events) == len(self._clears) + # return clear - return clear + def calc_ppu(self) -> float: + return ppu(self.iter_by_type('clear')) + + # # return self.clearsdict() + # # ) + # return list(self.clearsdict())[-1][1]['ppu'] + + @property + def ppu(self) -> float: + return round( + self.calc_ppu(), + ndigits=self.mkt.price_tick_digits, + ) + + def calc_size(self) -> float: + ''' + Calculate the unit size of this position in the destination + asset using the clears/trade event table; zero if expired. + + ''' + # time-expired pps (normally derivatives) are "closed" + # and have a zero size. + if self.expired(): + return 0. + + clears: list[dict] = list(self.clearsdict().values()) + if clears: + return clears[-1]['cumsize'] + else: + return 0. + + # if self.split_ratio is not None: + # size = round(size * self.split_ratio) + + # return float( + # self.mkt.quantize(size), + # ) + + # TODO: ideally we don't implicitly recompute the + # full sequence from `.clearsdict()` every read.. + # the writer-updates-local-attr-state was actually kinda nice + # before, but sometimes led to hard to detect bugs when + # state was de-synced. + @property + def cumsize(self) -> float: + + if ( + self.expiry + and self.expiry < now() + ): + return 0 + + return round( + self.calc_size(), + ndigits=self.mkt.size_tick_digits, + ) + + @property + def size(self) -> float: + log.warning('`Position.size` is deprecated, use `.cumsize`') + return self.cumsize # TODO: once we have an `.events` table with diff # mkt event types..? @@ -546,9 +486,7 @@ class Position(Struct): # ... -# TODO: maybe a better name is just `Account` and we include -# a table of asset balances as `.balances: dict[Asset, float]`? -class PpTable(Struct): +class Account(Struct): brokername: str acctid: str @@ -556,6 +494,9 @@ class PpTable(Struct): conf_path: Path conf: dict | None = {} + # TODO: track a table of asset balances as `.balances: + # dict[Asset, float]`? + def update_from_trans( self, trans: dict[str, Transaction], @@ -578,19 +519,17 @@ class PpTable(Struct): key=lambda t: t.dt, # reverse=True, ): - fqme = t.fqme - bs_mktid = t.bs_mktid + fqme: str = t.fqme + bs_mktid: str = t.bs_mktid # template the mkt-info presuming a legacy market ticks # if no info exists in the transactions.. mkt: MktPair = t.sys - pos = pps.get(bs_mktid) - if not pos: + + if not (pos := pps.get(bs_mktid)): # if no existing pos, allocate fresh one. pos = pps[bs_mktid] = Position( mkt=mkt, - size=0.0, - ppu=0.0, bs_mktid=bs_mktid, ) else: @@ -602,33 +541,33 @@ class PpTable(Struct): if len(pos.mkt.fqme) < len(fqme): pos.mkt = mkt - clears: list[dict] = pos._clears - if clears: - first_clear_dt = pos.first_clear_dt + # clears: list[dict] = pos._clears + # if clears: + # # first_clear_dt = pos.first_clear_dt - # don't do updates for ledger records we already have - # included in the current pps state. - if ( - t.tid in clears - or ( - first_clear_dt - and t.dt < first_clear_dt - ) - ): - # NOTE: likely you'll see repeats of the same - # ``Transaction`` passed in here if/when you are restarting - # a ``brokerd.ib`` where the API will re-report trades from - # the current session, so we need to make sure we don't - # "double count" these in pp calculations. - continue + # # don't do updates for ledger records we already have + # # included in the current pps state. + # if ( + # t.tid in clears + # # or ( + # # first_clear_dt + # # and t.dt < first_clear_dt + # # ) + # ): + # # NOTE: likely you'll see repeats of the same + # # ``Transaction`` passed in here if/when you are restarting + # # a ``brokerd.ib`` where the API will re-report trades from + # # the current session, so we need to make sure we don't + # # "double count" these in pp calculations. + # continue # update clearing table pos.add_clear(t) updated[t.bs_mktid] = pos # re-calc ppu and accumulative sizing. - for bs_mktid, pos in updated.items(): - pos.ensure_state() + # for bs_mktid, pos in updated.items(): + # pos.ensure_state() # NOTE: deliver only the position entries that were # actually updated (modified the state) from the input @@ -657,29 +596,21 @@ class PpTable(Struct): pp_objs = self.pps for bs_mktid in list(pp_objs): - pp = pp_objs[bs_mktid] - pp.ensure_state() + pos = pp_objs[bs_mktid] + # pos.ensure_state() - if ( - # "net-zero" is a "closed" position - pp.size == 0 - - # time-expired pps (normally derivatives) are "closed" - or (pp.expiry and pp.expiry < now()) - ): - # for expired cases - pp.size = 0 - - # NOTE: we DO NOT pop the pp here since it can still be + # "net-zero" is a "closed" position + if pos.cumsize == 0: + # NOTE: we DO NOT pop the pos here since it can still be # used to check for duplicate clears that may come in as # new transaction from some backend API and need to be # ignored; the closed positions won't be written to the # ``pps.toml`` since ``pp_active_entries`` above is what's # written. - closed_pp_objs[bs_mktid] = pp + closed_pp_objs[bs_mktid] = pos else: - open_pp_objs[bs_mktid] = pp + open_pp_objs[bs_mktid] = pos return open_pp_objs, closed_pp_objs @@ -700,13 +631,14 @@ class PpTable(Struct): for bs_mktid, pos in active.items(): # NOTE: we only store the minimal amount of clears that make up this # position since the last net-zero state. - pos.minimize_clears() - pos.ensure_state() + # pos.minimize_clears() + # pos.ensure_state() # serialize to pre-toml form fqme, asdict = pos.to_pretoml() - assert 'Datetime' not in asdict['clears'][0]['dt'] + # clears: list[dict] = asdict['clears'] + # assert 'Datetime' not in [0]['dt'] log.info(f'Updating active pp: {fqme}') # XXX: ugh, it's cuz we push the section under @@ -769,12 +701,28 @@ class PpTable(Struct): for entry in list(self.conf): del self.conf[entry] + # XXX WTF: if we use a tomlkit.Integer here we get this + # super weird --1 thing going on for cumsize!?1! + # NOTE: the fix was to always float() the size value loaded + # in open_pps() below! + + # confclears = self.conf["tsla.nasdaq.ib"]['clears'] + # firstcum = confclears[0]['cumsize'] + # if firstcum: + # breakpoint() + config.write( config=self.conf, path=self.conf_path, fail_empty=False, ) + # breakpoint() + + +# TODO: move over all broker backend usage to new name.. +PpTable = Account + def load_account( brokername: str, @@ -928,8 +876,7 @@ def open_pps( toml_clears_list: list[dict[str, Any]] = entry['clears'] trans: list[Transaction] = [] for clears_table in toml_clears_list: - - tid = clears_table.get('tid') + tid = clears_table['tid'] dt: tomlkit.items.DateTime | str = clears_table['dt'] # woa cool, `tomlkit` will actually load datetimes into @@ -943,29 +890,33 @@ def open_pps( sym=mkt, bs_mktid=bs_mktid, tid=tid, - size=clears_table['size'], - price=clears_table['price'], + size=float(clears_table['size']), + price=float(clears_table['price']), cost=clears_table['cost'], dt=dt, )) - size = entry['size'] + # size = entry['size'] - # TODO: remove but, handle old field name for now - ppu = entry.get( - 'ppu', - entry.get('be_price', 0), - ) + # # TODO: remove but, handle old field name for now + # ppu = entry.get( + # 'ppu', + # entry.get('be_price', 0), + # ) split_ratio = entry.get('split_ratio') - if expiry := entry.get('expiry'): - expiry = pendulum.parse(expiry) + # if a string-ified expiry field is loaded we try to parse + # it, THO, they should normally be serialized as native + # TOML datetimes, since that's supported. + if ( + (expiry := entry.get('expiry')) + and isinstance(expiry, str) + ): + expiry: pendulum.DateTime = pendulum.parse(expiry) pp = pp_objs[bs_mktid] = Position( mkt, - size=size, - ppu=ppu, split_ratio=split_ratio, bs_mktid=bs_mktid, ) @@ -979,7 +930,7 @@ def open_pps( pp.add_clear(t) # audit entries loaded from toml - pp.ensure_state() + # pp.ensure_state() try: yield table @@ -994,10 +945,10 @@ def load_pps_from_ledger( acctname: str, # post normalization filter on ledger entries to be processed - filter_by_ids: list[str] | None = None, + filter_by_ids: dict[str, list[str]] | None = None, ) -> tuple[ - dict[str, Transaction], + pl.DataFrame, PpTable, ]: ''' @@ -1022,20 +973,52 @@ def load_pps_from_ledger( src_records: dict[str, Transaction] = mod.norm_trade_records( ledger ) + table.update_from_trans(src_records) - if not filter_by_ids: - # records = src_records - records = ledger + fdf = df = pl.DataFrame( + list(rec.to_dict() for rec in src_records.values()), + # schema=[ + # ('tid', str), + # ('fqme', str), + # ('dt', str), + # ('size', pl.Float64), + # ('price', pl.Float64), + # ('cost', pl.Float64), + # ('expiry', str), + # ('bs_mktid', str), + # ], + ).sort('dt').select([ + pl.col('fqme'), + pl.col('dt').str.to_datetime(), + # pl.col('expiry').dt.datetime(), + pl.col('bs_mktid'), + pl.col('size'), + pl.col('price'), + ]) + # ppt = df.groupby('fqme').agg([ + # # TODO: ppu and bep !! + # pl.cumsum('size').alias('cumsum'), + # ]) + acts = df.partition_by('fqme', as_dict=True) + # ppt: dict[str, pl.DataFrame] = {} + # for fqme, ppt in act.items(): + # ppt.with_columuns + # # TODO: ppu and bep !! + # pl.cumsum('size').alias('cumsum'), + # ]) - else: - records = {} - bs_mktids = set(map(str, filter_by_ids)) + # filter out to the columns matching values filter passed + # as input. + if filter_by_ids: + for col, vals in filter_by_ids.items(): + str_vals = set(map(str, vals)) + pred: pl.Expr = pl.col(col).eq(str_vals.pop()) + for val in str_vals: + pred |= pl.col(col).eq(val) - # for tid, recdict in ledger.items(): - for tid, r in src_records.items(): - if r.bs_mktid in bs_mktids: - records[tid] = r.to_dict() + fdf = df.filter(pred) - # updated = table.update_from_trans(records) + bs_mktid: str = fdf[0]['bs_mktid'] + # pos: Position = table.pps[bs_mktid] - return records, table + return fdf, acts, table diff --git a/piker/accounting/calc.py b/piker/accounting/calc.py new file mode 100644 index 00000000..034c810a --- /dev/null +++ b/piker/accounting/calc.py @@ -0,0 +1,276 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Calculation routines for balance and position tracking such that +you know when you're losing money (if possible) XD + +''' +from __future__ import annotations +from math import copysign +from typing import ( + Any, + Callable, + Iterator, + TYPE_CHECKING, +) + +from pendulum import ( + # datetime, + DateTime, + from_timestamp, + parse, +) + +if TYPE_CHECKING: + from ._ledger import ( + Transaction, + ) + + +def ppu( + clears: Iterator[Transaction], + + # include transaction cost in breakeven price + # and presume the worst case of the same cost + # to exit this transaction (even though in reality + # it will be dynamic based on exit stratetgy). + cost_scalar: float = 2, + + # return the ledger of clears as a (now dt sorted) dict with + # new position fields inserted alongside each entry. + as_ledger: bool = False, + +) -> float: + ''' + Compute the "price-per-unit" price for the given non-zero sized + rolling position. + + The recurrence relation which computes this (exponential) mean + per new clear which **increases** the accumulative postiion size + is: + + ppu[-1] = ( + ppu[-2] * accum_size[-2] + + + ppu[-1] * size + ) / accum_size[-1] + + where `cost_basis` for the current step is simply the price + * size of the most recent clearing transaction. + + ''' + asize_h: list[float] = [] # historical accumulative size + ppu_h: list[float] = [] # historical price-per-unit + ledger: dict[str, dict] = {} + + # entry: dict[str, Any] | Transaction + t: Transaction + for t in clears: + # tid: str = entry['tid'] + # clear_size = entry['size'] + clear_size: float = t.size + # clear_price: str | float = entry['price'] + clear_price: str | float = t.price + is_clear: bool = not isinstance(clear_price, str) + + last_accum_size = asize_h[-1] if asize_h else 0 + accum_size = last_accum_size + clear_size + accum_sign = copysign(1, accum_size) + + sign_change: bool = False + + if accum_size == 0: + ppu_h.append(0) + asize_h.append(0) + continue + + # on transfers we normally write some non-valid + # price since withdrawal to another account/wallet + # has nothing to do with inter-asset-market prices. + # TODO: this should be better handled via a `type: 'tx'` + # field as per existing issue surrounding all this: + # https://github.com/pikers/piker/issues/510 + if isinstance(clear_price, str): + # TODO: we can't necessarily have this commit to + # the overall pos size since we also need to + # include other positions contributions to this + # balance or we might end up with a -ve balance for + # the position.. + continue + + # test if the pp somehow went "passed" a net zero size state + # resulting in a change of the "sign" of the size (+ve for + # long, -ve for short). + sign_change = ( + copysign(1, last_accum_size) + accum_sign == 0 + and last_accum_size != 0 + ) + + # since we passed the net-zero-size state the new size + # after sum should be the remaining size the new + # "direction" (aka, long vs. short) for this clear. + if sign_change: + clear_size = accum_size + abs_diff = abs(accum_size) + asize_h.append(0) + ppu_h.append(0) + + else: + # old size minus the new size gives us size diff with + # +ve -> increase in pp size + # -ve -> decrease in pp size + abs_diff = abs(accum_size) - abs(last_accum_size) + + # XXX: LIFO breakeven price update. only an increaze in size + # of the position contributes the breakeven price, + # a decrease does not (i.e. the position is being made + # smaller). + # abs_clear_size = abs(clear_size) + abs_new_size: float | int = abs(accum_size) + + if ( + abs_diff > 0 + and is_clear + ): + + cost_basis = ( + # cost basis for this clear + clear_price * abs(clear_size) + + + # transaction cost + # accum_sign * cost_scalar * entry['cost'] + accum_sign * cost_scalar * t.cost + ) + + if asize_h: + size_last = abs(asize_h[-1]) + cb_last = ppu_h[-1] * size_last + ppu = (cost_basis + cb_last) / abs_new_size + + else: + ppu = cost_basis / abs_new_size + + # ppu_h.append(ppu) + # asize_h.append(accum_size) + + else: + # TODO: for PPU we should probably handle txs out + # (aka withdrawals) similarly by simply not having + # them contrib to the running PPU calc and only + # when the next entry clear comes in (which will + # then have a higher weighting on the PPU). + + # on "exit" clears from a given direction, + # only the size changes not the price-per-unit + # need to be updated since the ppu remains constant + # and gets weighted by the new size. + ppu: float = ppu_h[-1] # set to previous value + # ppu_h.append(ppu_h[-1]) + # asize_h.append(accum_size) + + # extend with new rolling metric for this step + ppu_h.append(ppu) + asize_h.append(accum_size) + + # ledger[t.tid] = { + # 'tx': t, + ledger[t.tid] = t.to_dict() | { + 'ppu': ppu, + 'cumsize': accum_size, + 'sign_change': sign_change, + + # TODO: cumpnl, bep + } + + final_ppu = ppu_h[-1] if ppu_h else 0 + # TODO: once we have etypes in all ledger entries.. + # handle any split info entered (for now) manually by user + # if self.split_ratio is not None: + # final_ppu /= self.split_ratio + + if as_ledger: + return ledger + + else: + return final_ppu + + +def iter_by_dt( + records: ( + dict[str, dict[str, Any]] + | list[dict] + | list[Transaction] # XXX preferred! + ), + + # NOTE: parsers are looked up in the insert order + # so if you know that the record stats show some field + # is more common then others, stick it at the top B) + parsers: dict[tuple[str], Callable] = { + 'dt': None, # parity case + 'datetime': parse, # datetime-str + 'time': from_timestamp, # float epoch + }, + key: Callable | None = None, + +) -> Iterator[tuple[str, dict]]: + ''' + Iterate entries of a transaction table sorted by entry recorded + datetime presumably set at the ``'dt'`` field in each entry. + + ''' + # isdict: bool = False + if isinstance(records, dict): + # isdict: bool = True + records = list(records.items()) + + def dyn_parse_to_dt( + tx: tuple[str, dict[str, Any]] | Transaction, + ) -> DateTime: + + # handle `.items()` inputs + if isinstance(tx, tuple): + tx = tx[1] + + # dict or tx object? + isdict: bool = isinstance(tx, dict) + + # get best parser for this record.. + for k in parsers: + if ( + isdict and k in tx + or getattr(tx, k, None) + ): + v = tx[k] if isdict else tx.dt + if v is None: + breakpoint() + + parser = parsers[k] + + # only call parser on the value if not None from the + # `parsers` table above, otherwise pass through the value + # and sort on it directly + return parser(v) if (parser is not None) else v + + else: + breakpoint() + + entry: tuple[str, dict] | Transaction + for entry in sorted( + records, + key=key or dyn_parse_to_dt, + ): + yield entry diff --git a/piker/accounting/cli.py b/piker/accounting/cli.py index 290c1a5e..a562e26e 100644 --- a/piker/accounting/cli.py +++ b/piker/accounting/cli.py @@ -240,9 +240,13 @@ def sync( def disect( # "fully_qualified_account_name" fqan: str, - bs_mktid: str, # for ib + fqme: str, # for ib pdb: bool = False, + bs_mktid: str = typer.Option( + None, + "-bid", + ), loglevel: str = typer.Option( 'error', "-l", @@ -255,36 +259,24 @@ def disect( brokername, account = pair # ledger: TransactionLedger - records: dict[str, dict] + # records: dict[str, dict] table: PpTable - records, table = load_pps_from_ledger( + df: pl.DataFrame # legder df + ppt: pl.DataFrame # piker position table + df, ppt, table = load_pps_from_ledger( brokername, account, - filter_by_ids={bs_mktid}, + filter_by_ids={'fqme': [fqme]}, ) - df = pl.DataFrame( - list(records.values()), - # schema=[ - # ('tid', str), - # ('fqme', str), - # ('dt', str), - # ('size', pl.Float64), - # ('price', pl.Float64), - # ('cost', pl.Float64), - # ('expiry', str), - # ('bs_mktid', str), - # ], - ).select([ - pl.col('fqme'), - pl.col('dt').str.to_datetime(), - # pl.col('expiry').dt.datetime(), - pl.col('size'), - pl.col('price'), - ]) - + # sers = [ + # pl.Series(e['fqme'], e['cumsum']) + # for e in ppt.to_dicts() + # ] + # ppt_by_id: pl.DataFrame = ppt.filter( + # pl.col('fqme') == fqme, + # ) assert not df.is_empty() breakpoint() - # tractor.pause_from_sync() # with open_trade_ledger( # brokername, # account,