diff --git a/piker/pp.py b/piker/pp.py index 68663f89..3860bf36 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -33,6 +33,7 @@ from typing import ( from msgspec import Struct import pendulum from pendulum import datetime, now +# import tomli import toml from . import config @@ -357,6 +358,9 @@ def load_pps_from_ledger( brokername: str, acctname: str, + # post normalization filter on ledger entries to be processed + filter_by: Optional[list[Transaction]] = None, + ) -> dict[str, Position]: ''' Open a ledger file by broker name and account and read in and @@ -369,14 +373,17 @@ def load_pps_from_ledger( brokername, acctname, ) as ledger: - pass # readonly - - if not ledger: - # null case, no ledger file with content - return {} + if not ledger: + # null case, no ledger file with content + return {} brokermod = get_brokermod(brokername) records = brokermod.norm_trade_records(ledger) + + if filter_by: + bsuids = set(r.bsuid for r in filter_by) + records = filter(lambda r: r.bsuid in bsuids, records) + return update_pps(records) @@ -551,7 +558,15 @@ class PpsEncoder(toml.TomlEncoder): def load_pps_from_toml( brokername: str, - acctid: str + acctid: str, + + # XXX: there is an edge case here where we may want to either audit + # the retrieved ``pps.toml`` output or reprocess it since there was + # an error on write on the last attempt to update the state file + # even though the ledger *was* updated. For this cases we allow the + # caller to pass in a symbol set they'd like to reload from the + # underlying ledger to be reprocessed in computing pps state. + reload_records: Optional[list[Transaction]] = None, ) -> tuple[dict, dict[str, Position]]: ''' @@ -563,6 +578,7 @@ def load_pps_from_toml( conf, path = config.load('pps') brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) + pp_objs = {} if not pps: # no pps entry yet for this broker/account so parse @@ -571,46 +587,59 @@ def load_pps_from_toml( brokername, acctid, ) - if not pps: - log.warning( - f'No trade history could be loaded for {brokername}:{acctid}' - ) - else: - # unmarshal/load ``pps.toml`` config entries into object form. - pp_objs = {} - for fqsn, entry in pps.items(): + # Reload symbol specific ledger entries if requested by the + # caller **AND** none exist in the current pps state table. + elif ( + pps and reload_records and + not any(r.fqsn in pps for r in reload_records) + ): + # no pps entry yet for this broker/account so parse + # any available ledgers to build a pps state. + pp_objs = load_pps_from_ledger( + brokername, + acctid, + filter_by=reload_records, + ) - # convert clears sub-tables (only in this form - # for toml re-presentation) back into a master table. - clears_list = entry['clears'] + if not pps: + log.warning( + f'No trade history could be loaded for {brokername}:{acctid}' + ) - # index clears entries in "object" form by tid in a top - # level dict instead of a list (as is presented in our - # ``pps.toml``). - clears = {} - for clears_table in clears_list: - tid = clears_table.pop('tid') - clears[tid] = clears_table + # unmarshal/load ``pps.toml`` config entries into object form. + for fqsn, entry in pps.items(): - expiry = entry.get('expiry') - if expiry: - expiry = pendulum.parse(expiry) + # convert clears sub-tables (only in this form + # for toml re-presentation) back into a master table. + clears_list = entry['clears'] - pp_objs[fqsn] = Position( - Symbol.from_fqsn(fqsn, info={}), - size=entry['size'], - be_price=entry['be_price'], - expiry=expiry, - bsuid=entry['bsuid'], + # index clears entries in "object" form by tid in a top + # level dict instead of a list (as is presented in our + # ``pps.toml``). + clears = {} + for clears_table in clears_list: + tid = clears_table.pop('tid') + clears[tid] = clears_table - # XXX: super critical, we need to be sure to include - # all pps.toml clears to avoid reusing clears that were - # already included in the current incremental update - # state, since today's records may have already been - # processed! - clears=clears, - ) + expiry = entry.get('expiry') + if expiry: + expiry = pendulum.parse(expiry) + + pp_objs[fqsn] = Position( + Symbol.from_fqsn(fqsn, info={}), + size=entry['size'], + be_price=entry['be_price'], + expiry=expiry, + bsuid=entry['bsuid'], + + # XXX: super critical, we need to be sure to include + # all pps.toml clears to avoid reusing clears that were + # already included in the current incremental update + # state, since today's records may have already been + # processed! + clears=clears, + ) return conf, pp_objs @@ -618,12 +647,17 @@ def load_pps_from_toml( def update_pps_conf( brokername: str, acctid: str, + trade_records: Optional[list[Transaction]] = None, key_by: Optional[str] = None, ) -> dict[str, Position]: - conf, pp_objs = load_pps_from_toml(brokername, acctid) + conf, pp_objs = load_pps_from_toml( + brokername, + acctid, + reload_records=trade_records, + ) # update all pp objects from any (new) trade records which # were passed in (aka incremental update case).