From c0d575c009a8c3cb90495c9b52b435fda78518eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Jun 2023 12:58:50 -0400 Subject: [PATCH] Change `Position.clears` -> `._clears[list[dict]]` When you look at usage we don't end up really needing clear entries to be keyed by their `Transaction.tid`, instead it's much more important to ensure the time sorted order of trade-clearing transactions such that position properties such as the size and ppu are calculated correctly. Thus, this instead simplified the `.clears` table to a list of clear dict entries making a bunch of things simpler: - object form `Position._clears` compared to the offline TOML schema (saved in account files) is now data-structure-symmetrical. - `Position.add_clear()` now uses `bisect.insort()` to datetime-field-sort-insert into the *list* which saves having to worry about sorting on every sequence *read*. Further deats: - adjust `.accounting._ledger.iter_by_dt()` to expect an input `list`. - change `Position.iter_clears()` to iterate only the clearing entry dicts without yielding a key/tid; no more tuples. - drop `Position.to_dict()` since parent `Struct` already implements it. --- piker/accounting/_ledger.py | 30 +++---- piker/accounting/_pos.py | 170 +++++++++++++++++++----------------- 2 files changed, 106 insertions(+), 94 deletions(-) diff --git a/piker/accounting/_ledger.py b/piker/accounting/_ledger.py index 04ee04b7..268a81fc 100644 --- a/piker/accounting/_ledger.py +++ b/piker/accounting/_ledger.py @@ -160,15 +160,16 @@ class TransactionLedger(UserDict): # normer = mod.norm_trade_record(txdict) # TODO: use tx_sort here yah? - for tid, txdict in self.data.items(): + for txdict in self.tx_sort(self.data.values()): + # for tid, txdict in self.data.items(): # special field handling for datetimes # to ensure pendulum is used! - fqme = txdict.get('fqme') or txdict['fqsn'] - dt = parse(txdict['dt']) - expiry = txdict.get('expiry') + tid: str = txdict['tid'] + fqme: str = txdict.get('fqme') or txdict['fqsn'] + dt: DateTime = parse(txdict['dt']) + expiry: str | None = txdict.get('expiry') - mkt = mkt_by_fqme.get(fqme) - if not mkt: + if not (mkt := mkt_by_fqme.get(fqme)): # we can't build a trans if we don't have # the ``.sys: MktPair`` info, so skip. continue @@ -229,7 +230,7 @@ class TransactionLedger(UserDict): def iter_by_dt( - records: dict[str, Any], + records: dict[str, dict[str, Any]] | list[dict], # NOTE: parsers are looked up in the insert order # so if you know that the record stats show some field @@ -247,21 +248,20 @@ def iter_by_dt( datetime presumably set at the ``'dt'`` field in each entry. ''' - def dyn_parse_to_dt( - pair: tuple[str, dict], - ) -> DateTime: - _, txdict = pair + def dyn_parse_to_dt(txdict: dict[str, Any]) -> DateTime: k, v, parser = next( (k, txdict[k], parsers[k]) for k in parsers if k in txdict ) - return parser(v) if parser else v - for tid, data in sorted( - records.items(), + if isinstance(records, dict): + records = records.values() + + for entry in sorted( + records, key=key or dyn_parse_to_dt, ): - yield tid, data + yield entry def load_ledger( diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index f50040cb..1288c688 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -22,6 +22,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. ''' from __future__ import annotations +from bisect import insort from contextlib import contextmanager as cm from decimal import Decimal from math import copysign @@ -30,7 +31,6 @@ from pathlib import Path from typing import ( Any, Iterator, - Union, Generator ) @@ -52,7 +52,6 @@ from ._mktinfo import ( from .. import config from ..clearing._messages import ( BrokerdPosition, - Status, ) from ..data.types import Struct from ..log import get_logger @@ -66,16 +65,17 @@ class Position(Struct): A financial "position" in `piker` terms is a summary of accounting metrics computed from a transaction ledger; generally it describes - some acumulative "size" and "average price" from the summarized + some accumulative "size" and "average price" from the summarized underlying transaction set. In piker we focus on the `.ppu` (price per unit) and the `.bep` (break even price) including all transaction entries and exits since the last "net-zero" size of the destination asset's holding. - This interface serves as an object API for computing and tracking - positions as well as supports serialization for storage in the local - file system (in TOML) and to interchange as a msg over IPC. + This interface serves as an object API for computing and + tracking positions as well as supports serialization for + storage in the local file system (in TOML) and to interchange + as a msg over IPC. ''' mkt: MktPair @@ -100,10 +100,9 @@ class Position(Struct): split_ratio: int | None = None # ordered record of known constituent trade messages - clears: dict[ - Union[str, int, Status], # trade id + _clears: list[ dict[str, Any], # transaction history summaries - ] = {} + ] = [] first_clear_dt: datetime | None = None expiry: datetime | None = None @@ -111,34 +110,30 @@ class Position(Struct): def __repr__(self) -> str: return pformat(self.to_dict()) - def to_dict(self) -> dict: - return { - f: getattr(self, f) - for f in self.__struct_fields__ - } - def to_pretoml(self) -> tuple[str, dict]: ''' - Prep this position's data contents for export to toml including - re-structuring of the ``.clears`` table to an array of - inline-subtables for better ``pps.toml`` compactness. + Prep this position's data contents for export as an entry + in a TOML "account file" (such as + `account.binance.paper.toml`) including re-structuring of + the ``._clears`` entries as an array of inline-subtables + for better ``pps.toml`` compactness. ''' - d = self.to_dict() - clears = d.pop('clears') - expiry = d.pop('expiry') + asdict = self.to_dict() + clears: list[dict] = asdict.pop('_clears') + expiry = asdict.pop('expiry') if self.split_ratio is None: - d.pop('split_ratio') + asdict.pop('split_ratio') # should be obvious from clears/event table - d.pop('first_clear_dt') + asdict.pop('first_clear_dt') # TODO: we need to figure out how to have one top level # listing venue here even when the backend isn't providing # it via the trades ledger.. # drop symbol obj in serialized form - mkt: MktPair = d.pop('mkt') + mkt: MktPair = asdict.pop('mkt') assert isinstance(mkt, MktPair) fqme = mkt.fqme @@ -148,15 +143,15 @@ class Position(Struct): # each tradeable asset in the market. if mkt.resolved: dst: Asset = mkt.dst - d['asset_type'] = dst.atype + asdict['asset_type'] = dst.atype - d['price_tick'] = mkt.price_tick - d['size_tick'] = mkt.size_tick + asdict['price_tick'] = mkt.price_tick + asdict['size_tick'] = mkt.size_tick if self.expiry is None: - d.pop('expiry', None) + asdict.pop('expiry', None) elif expiry: - d['expiry'] = str(expiry) + asdict['expiry'] = str(expiry) clears_table: tomlkit.Array = tomlkit.array() clears_table.multiline( @@ -165,30 +160,29 @@ class Position(Struct): ) # reverse sort so latest clears are at top of section? - for tid, data in iter_by_dt(clears): + for entry in iter_by_dt(clears): inline_table = tomlkit.inline_table() # serialize datetime to parsable `str` - dtstr = inline_table['dt'] = data['dt'].isoformat('T') + dtstr = inline_table['dt'] = entry['dt'].isoformat('T') assert 'Datetime' not in dtstr # insert optional clear fields in column order for k in ['ppu', 'accum_size']: - val = data.get(k) - if val: + if val := entry.get(k): inline_table[k] = val # insert required fields for k in ['price', 'size', 'cost']: - inline_table[k] = data[k] + inline_table[k] = entry[k] - inline_table['tid'] = tid + inline_table['tid'] = entry['tid'] clears_table.append(inline_table) - d['clears'] = clears_table + asdict['clears'] = clears_table - return fqme, d + return fqme, asdict def ensure_state(self) -> None: ''' @@ -197,18 +191,16 @@ class Position(Struct): they differ and log warnings to console. ''' - clears = list(self.clears.values()) - self.first_clear_dt = min( - list(entry['dt'] for entry in clears) - ) - last_clear = clears[-1] + clears: list[dict] = self._clears + self.first_clear_dt = min(clears, key=lambda e: e['dt'])['dt'] + last_clear: dict = clears[-1] + csize: float = self.calc_size() + accum: float = last_clear['accum_size'] - csize = self.calc_size() - accum = last_clear['accum_size'] if not self.expired(): if ( csize != accum - and csize != round(accum * self.split_ratio or 1) + and csize != round(accum * (self.split_ratio or 1)) ): raise ValueError(f'Size mismatch: {csize}') else: @@ -221,11 +213,12 @@ class Position(Struct): ) self.size = csize - cppu = self.calc_ppu() - ppu = last_clear['ppu'] + cppu: float = self.calc_ppu() + ppu: float = last_clear['ppu'] if ( cppu != ppu and self.split_ratio is not None + # handle any split info entered (for now) manually by user and cppu != (ppu / self.split_ratio) ): @@ -281,15 +274,15 @@ class Position(Struct): def iter_clears(self) -> Iterator[tuple[str, dict]]: ''' - Iterate the internally managed ``.clears: dict`` table in + Iterate the internally managed ``._clears: dict`` table in datetime-stamped order. ''' # sort on the already existing datetime that should have # been generated for the entry's table return iter_by_dt( - self.clears, - key=lambda entry: entry[1]['dt'] + self._clears, + key=lambda entry: entry['dt'] ) def calc_ppu( @@ -323,9 +316,8 @@ class Position(Struct): asize_h: list[float] = [] # historical accumulative size ppu_h: list[float] = [] # historical price-per-unit - tid: str entry: dict[str, Any] - for (tid, entry) in self.iter_clears(): + for entry in self.iter_clears(): clear_size = entry['size'] clear_price: str | float = entry['price'] is_clear: bool = not isinstance(clear_price, str) @@ -451,7 +443,7 @@ class Position(Struct): if self.expired(): return 0. - for tid, entry in self.clears.items(): + for entry in self._clears: size += entry['size'] # XXX: do we need it every step? # no right since rounding is an LT? @@ -474,11 +466,11 @@ class Position(Struct): ''' Minimize the position's clears entries by removing all transactions before the last net zero size to avoid - unecessary history irrelevant to the current pp state. + unnecessary history irrelevant to the current pp state. ''' size: float = 0 - clears_since_zero: list[tuple(str, dict)] = [] + clears_since_zero: list[dict] = [] # TODO: we might just want to always do this when iterating # a ledger? keep a state of the last net-zero and only do the @@ -486,34 +478,44 @@ class Position(Struct): # scan for the last "net zero" position by iterating # transactions until the next net-zero size, rinse, repeat. - for tid, clear in self.clears.items(): + for clear in self._clears: size = float( self.mkt.quantize(size + clear['size']) ) - clears_since_zero.append((tid, clear)) + clears_since_zero.append(clear) if size == 0: clears_since_zero.clear() - self.clears = dict(clears_since_zero) - return self.clears + self._clears = clears_since_zero + return self._clears def add_clear( self, t: Transaction, ) -> dict: ''' - Update clearing table and populate rolling ppu and accumulative - size in both the clears entry and local attrs state. + Update clearing table by calculating the rolling ppu and + (accumulative) size in both the clears entry and local + attrs state. + + Inserts are always done in datetime sorted order. ''' - clear = self.clears[t.tid] = { + clear: dict[str, float | str | int] = { + 'tid': t.tid, 'cost': t.cost, 'price': t.price, 'size': t.size, 'dt': t.dt } + insort( + self._clears, + clear, + key=lambda entry: entry['dt'] + ) + # TODO: compute these incrementally instead # of re-looping through each time resulting in O(n**2) # behaviour..? @@ -526,10 +528,14 @@ class Position(Struct): return clear - # def sugest_split(self) -> float: + # TODO: once we have an `.events` table with diff + # mkt event types..? + # def suggest_split(self) -> float: # ... +# TODO: maybe a better name is just `Account` and we include +# a table of asset balances as `.balances: dict[Asset, float]`? class PpTable(Struct): brokername: str @@ -544,7 +550,12 @@ class PpTable(Struct): cost_scalar: float = 2, ) -> dict[str, Position]: + ''' + Update the internal `.pps[str, Position]` table from input + transactions recomputing the price-per-unit (ppu) and + accumulative size for each entry. + ''' pps = self.pps updated: dict[str, Position] = {} @@ -553,7 +564,7 @@ class PpTable(Struct): for t in sorted( trans.values(), key=lambda t: t.dt, - reverse=True, + # reverse=True, ): fqme = t.fqme bs_mktid = t.bs_mktid @@ -561,10 +572,10 @@ class PpTable(Struct): # template the mkt-info presuming a legacy market ticks # if no info exists in the transactions.. mkt: MktPair = t.sys - pp = pps.get(bs_mktid) - if not pp: - # if no existing pp, allocate fresh one. - pp = pps[bs_mktid] = Position( + pos = pps.get(bs_mktid) + if not pos: + # if no existing pos, allocate fresh one. + pos = pps[bs_mktid] = Position( mkt=mkt, size=0.0, ppu=0.0, @@ -577,12 +588,12 @@ class PpTable(Struct): # a shorter string), instead use the one from the # transaction since it likely has (more) full # information from the provider. - if len(pp.mkt.fqme) < len(fqme): - pp.mkt = mkt + if len(pos.mkt.fqme) < len(fqme): + pos.mkt = mkt - clears = pp.clears + clears: list[dict] = pos._clears if clears: - first_clear_dt = pp.first_clear_dt + first_clear_dt = pos.first_clear_dt # don't do updates for ledger records we already have # included in the current pps state. @@ -601,15 +612,16 @@ class PpTable(Struct): continue # update clearing table - pp.add_clear(t) - updated[t.bs_mktid] = pp + pos.add_clear(t) + updated[t.bs_mktid] = pos - # minimize clears tables and update sizing. - for bs_mktid, pp in updated.items(): - pp.ensure_state() + # re-calc ppu and accumulative sizing. + for bs_mktid, pos in updated.items(): + pos.ensure_state() - # deliver only the position entries that were actually updated - # (modified the state) from the input transaction set. + # NOTE: deliver only the position entries that were + # actually updated (modified the state) from the input + # transaction set. return updated def dump_active(