diff --git a/piker/pp.py b/piker/pp.py index dd6ba3aa..55ae924f 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -34,7 +34,6 @@ from typing import ( Union, ) -from msgspec import Struct import pendulum from pendulum import datetime, now import tomli @@ -45,6 +44,7 @@ from .brokers import get_brokermod from .clearing._messages import BrokerdPosition, Status from .data._source import Symbol from .log import get_logger +from .data.types import Struct log = get_logger(__name__) @@ -314,6 +314,8 @@ class PpTable(Struct): pps = self.pps + updated: dict[str, Position] = {} + # lifo update all pps from records for tid, r in trans.items(): @@ -358,7 +360,9 @@ class PpTable(Struct): # track clearing data pp.update(r) - return pps + updated[r.bsuid] = pp + + return updated def dump_active( self, @@ -393,16 +397,23 @@ class PpTable(Struct): pp.minimize_clears() if ( + # "net-zero" is a "closed" position pp.size == 0 - # drop time-expired positions (normally derivatives) + # time-expired pps (normally derivatives) are "closed" or (pp.expiry and pp.expiry < now()) ): - # if expired the position is closed + # for expired cases pp.size = 0 - # position is already closed aka "net zero" - closed_pp = pp_objs.pop(bsuid, None) + # NOTE: we DO NOT pop the pp here since it can still be + # used to check for duplicate clears that may come in as + # new transaction from some backend API and need to be + # ignored; the closed positions won't be written to the + # ``pps.toml`` since ``pp_entries`` above is what's + # written. + # closed_pp = pp_objs.pop(bsuid, None) + closed_pp = pp_objs.get(bsuid) if closed_pp: closed_pp_objs[bsuid] = closed_pp @@ -440,7 +451,9 @@ def update_pps( ''' pps: dict[str, Position] = pps or {} - return PpTable(pps).update_from_trans(records) + table = PpTable(pps) + table.update_from_trans(records) + return table.pps def load_trans_from_ledger( @@ -629,7 +642,7 @@ def load_pps_from_toml( # does a full refresh of pps from the available ledger. update_from_ledger: bool = False, -) -> tuple[dict, dict[str, Position]]: +) -> tuple[PpTable, dict[str, str]]: ''' Load and marshal to objects all pps from either an existing ``pps.toml`` config, or from scratch from a ledger file when @@ -646,9 +659,7 @@ def load_pps_from_toml( brokername, acctid, ) - # TODO: just call `.update_from_trans()`? - ledger_pp_objs = update_pps(trans) - pp_objs.update(ledger_pp_objs) + table.update_from_trans(trans) # Reload symbol specific ledger entries if requested by the # caller **AND** none exist in the current pps state table. @@ -662,15 +673,14 @@ def load_pps_from_toml( acctid, filter_by=reload_records, ) - ledger_pp_objs = update_pps(trans) - pp_objs.update(ledger_pp_objs) + table.update_from_trans(trans) - if not pp_objs: + if not table.pps: log.warning( f'No `pps.toml` values could be loaded {brokername}:{acctid}' ) - return table, table.conf, table.pps + return table, table.conf @cm @@ -687,6 +697,7 @@ def open_pps( conf, path = config.load('pps') brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) + pp_objs = {} table = PpTable(pp_objs, conf=conf) @@ -747,32 +758,33 @@ def open_pps( clears=clears, ) - orig = pp_objs.copy() + # orig = pp_objs.copy() try: yield table finally: - if orig != pp_objs: + # breakpoint() + # if orig != table.pps: - # TODO: show diff output? - # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - print(f'Updating ``pps.toml`` for {path}:\n') + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ``pps.toml`` for {path}:\n') - pp_entries, closed_pp_objs = table.dump_active(brokername) - conf[brokername][acctid] = pp_entries + pp_entries, closed_pp_objs = table.dump_active(brokername) + conf[brokername][acctid] = pp_entries - # TODO: why tf haven't they already done this for inline - # tables smh.. - enc = PpsEncoder(preserve=True) - # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) - enc.dump_funcs[ - toml.decoder.InlineTableDict - ] = enc.dump_inline_table + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table - config.write( - conf, - 'pps', - encoder=enc, - ) + config.write( + conf, + 'pps', + encoder=enc, + ) def update_pps_conf( @@ -803,10 +815,7 @@ def update_pps_conf( for tid, r in trade_records.items(): ledger_reload[r.bsuid] = r.fqsn - # this maps `.bsuid` values to positions - pp_objs: dict[Union[str, int], Position] - - table, conf, pp_objs = load_pps_from_toml( + table, conf = load_pps_from_toml( brokername, acctid, reload_records=ledger_reload, @@ -817,7 +826,9 @@ def update_pps_conf( if trade_records: table.update_from_trans(trade_records) + # this maps `.bsuid` values to positions pp_entries, closed_pp_objs = table.dump_active(brokername) + pp_objs: dict[Union[str, int], Position] = table.pps conf[brokername][acctid] = pp_entries