From 674783167710c1afdbc8892324731dc791d9f5ca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Jul 2022 08:26:28 -0400 Subject: [PATCH] Don't pop zero pps from table in `.dump_active()` In order to avoid double transaction adds/updates and too-early-discard of zero sized pps (like when trades are loaded from a backend broker but were already added to a ledger or `pps.toml` prior) we now **don't** pop such `Position` entries from the `.pps` table in order to keep each position's clears table always in place. This avoids the edge case where an entry was removed too early (due to zero size) but then duplicate trade entries that were in that entrie's clears show up from the backend and are entered into a new entry resulting in an incorrect size in a new entry..We still only push non-net-zero entries to the `pps.toml`. More fixes: - return the updated set of `Positions` from `.lifo_update()`. - return the full table set from `update_pps()`. - use `PpTable.update_from_trans()` more throughout. - always write the `pps.toml` on `open_pps()` exit. - only return table from `load_pps_from_toml()`. --- piker/pp.py | 87 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index dd6ba3aa..55ae924f 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -34,7 +34,6 @@ from typing import ( Union, ) -from msgspec import Struct import pendulum from pendulum import datetime, now import tomli @@ -45,6 +44,7 @@ from .brokers import get_brokermod from .clearing._messages import BrokerdPosition, Status from .data._source import Symbol from .log import get_logger +from .data.types import Struct log = get_logger(__name__) @@ -314,6 +314,8 @@ class PpTable(Struct): pps = self.pps + updated: dict[str, Position] = {} + # lifo update all pps from records for tid, r in trans.items(): @@ -358,7 +360,9 @@ class PpTable(Struct): # track clearing data pp.update(r) - return pps + updated[r.bsuid] = pp + + return updated def dump_active( self, @@ -393,16 +397,23 @@ class PpTable(Struct): pp.minimize_clears() if ( + # "net-zero" is a "closed" position pp.size == 0 - # drop time-expired positions (normally derivatives) + # time-expired pps (normally derivatives) are "closed" or (pp.expiry and pp.expiry < now()) ): - # if expired the position is closed + # for expired cases pp.size = 0 - # position is already closed aka "net zero" - closed_pp = pp_objs.pop(bsuid, None) + # NOTE: we DO NOT pop the pp here since it can still be + # used to check for duplicate clears that may come in as + # new transaction from some backend API and need to be + # ignored; the closed positions won't be written to the + # ``pps.toml`` since ``pp_entries`` above is what's + # written. + # closed_pp = pp_objs.pop(bsuid, None) + closed_pp = pp_objs.get(bsuid) if closed_pp: closed_pp_objs[bsuid] = closed_pp @@ -440,7 +451,9 @@ def update_pps( ''' pps: dict[str, Position] = pps or {} - return PpTable(pps).update_from_trans(records) + table = PpTable(pps) + table.update_from_trans(records) + return table.pps def load_trans_from_ledger( @@ -629,7 +642,7 @@ def load_pps_from_toml( # does a full refresh of pps from the available ledger. update_from_ledger: bool = False, -) -> tuple[dict, dict[str, Position]]: +) -> tuple[PpTable, dict[str, str]]: ''' Load and marshal to objects all pps from either an existing ``pps.toml`` config, or from scratch from a ledger file when @@ -646,9 +659,7 @@ def load_pps_from_toml( brokername, acctid, ) - # TODO: just call `.update_from_trans()`? - ledger_pp_objs = update_pps(trans) - pp_objs.update(ledger_pp_objs) + table.update_from_trans(trans) # Reload symbol specific ledger entries if requested by the # caller **AND** none exist in the current pps state table. @@ -662,15 +673,14 @@ def load_pps_from_toml( acctid, filter_by=reload_records, ) - ledger_pp_objs = update_pps(trans) - pp_objs.update(ledger_pp_objs) + table.update_from_trans(trans) - if not pp_objs: + if not table.pps: log.warning( f'No `pps.toml` values could be loaded {brokername}:{acctid}' ) - return table, table.conf, table.pps + return table, table.conf @cm @@ -687,6 +697,7 @@ def open_pps( conf, path = config.load('pps') brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) + pp_objs = {} table = PpTable(pp_objs, conf=conf) @@ -747,32 +758,33 @@ def open_pps( clears=clears, ) - orig = pp_objs.copy() + # orig = pp_objs.copy() try: yield table finally: - if orig != pp_objs: + # breakpoint() + # if orig != table.pps: - # TODO: show diff output? - # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - print(f'Updating ``pps.toml`` for {path}:\n') + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ``pps.toml`` for {path}:\n') - pp_entries, closed_pp_objs = table.dump_active(brokername) - conf[brokername][acctid] = pp_entries + pp_entries, closed_pp_objs = table.dump_active(brokername) + conf[brokername][acctid] = pp_entries - # TODO: why tf haven't they already done this for inline - # tables smh.. - enc = PpsEncoder(preserve=True) - # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) - enc.dump_funcs[ - toml.decoder.InlineTableDict - ] = enc.dump_inline_table + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table - config.write( - conf, - 'pps', - encoder=enc, - ) + config.write( + conf, + 'pps', + encoder=enc, + ) def update_pps_conf( @@ -803,10 +815,7 @@ def update_pps_conf( for tid, r in trade_records.items(): ledger_reload[r.bsuid] = r.fqsn - # this maps `.bsuid` values to positions - pp_objs: dict[Union[str, int], Position] - - table, conf, pp_objs = load_pps_from_toml( + table, conf = load_pps_from_toml( brokername, acctid, reload_records=ledger_reload, @@ -817,7 +826,9 @@ def update_pps_conf( if trade_records: table.update_from_trans(trade_records) + # this maps `.bsuid` values to positions pp_entries, closed_pp_objs = table.dump_active(brokername) + pp_objs: dict[Union[str, int], Position] = table.pps conf[brokername][acctid] = pp_entries