From 09d9a7ea2bff7c8e7cbda72ef314be93e99a227c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 18 Jul 2022 10:33:56 -0400 Subject: [PATCH] Expect `.norm_trade_records()` to return `dict` --- piker/pp.py | 107 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 33 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 4fe97ae6..906d1f2c 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -314,7 +314,7 @@ def update_pps( pps: dict[str, Position] = pps or {} # lifo update all pps from records - for r in records: + for tid, r in records.items(): pp = pps.setdefault( r.bsuid, @@ -385,20 +385,24 @@ def load_pps_from_ledger( return {} brokermod = get_brokermod(brokername) - src_records = brokermod.norm_trade_records(ledger) + src_records: dict[str, Transaction] = brokermod.norm_trade_records(ledger) if filter_by: + records = {} bsuids = set(filter_by) - records = list(filter(lambda r: r.bsuid in bsuids, src_records)) + for tid, r in src_records.items(): + if r.bsuid in bsuids: + records[tid] = r else: records = src_records return update_pps(records) -def get_pps( +@cm +def open_pps( brokername: str, - acctids: Optional[set[str]] = set(), + acctid: str, ) -> dict[str, dict[str, Position]]: ''' @@ -406,34 +410,68 @@ def get_pps( incremental update file: ``pps.toml``. ''' - conf, path = config.load( - 'pps', - # load dicts as inlines to preserve compactness - # _dict=toml.decoder.InlineTableDict, - ) + conf, path = config.load('pps') + brokersection = conf.setdefault(brokername, {}) + pps = brokersection.setdefault(acctid, {}) + pp_objs = {} - all_active = {} - all_closed = {} + # unmarshal/load ``pps.toml`` config entries into object form. + for fqsn, entry in pps.items(): + bsuid = entry['bsuid'] - # try to load any ledgers if no section found - bconf, path = config.load('brokers') - accounts = bconf[brokername]['accounts'] - for account in accounts: + # convert clears sub-tables (only in this form + # for toml re-presentation) back into a master table. + clears_list = entry['clears'] - # TODO: instead of this filter we could - # always send all known pps but just not audit - # them since an active client might not be up? - if ( - acctids and - f'{brokername}.{account}' not in acctids - ): - continue + # index clears entries in "object" form by tid in a top + # level dict instead of a list (as is presented in our + # ``pps.toml``). + pp = pp_objs.get(bsuid) + if pp: + clears = pp.clears + else: + clears = {} - active, closed = update_pps_conf(brokername, account) - all_active.setdefault(account, {}).update(active) - all_closed.setdefault(account, {}).update(closed) + for clears_table in clears_list: + tid = clears_table.pop('tid') + clears[tid] = clears_table - return all_active, all_closed + size = entry['size'] + + # TODO: an audit system for existing pps entries? + # if not len(clears) == abs(size): + # pp_objs = load_pps_from_ledger( + # brokername, + # acctid, + # filter_by=reload_records, + # ) + # reason = 'size <-> len(clears) mismatch' + # raise ValueError( + # '`pps.toml` entry is invalid:\n' + # f'{fqsn}\n' + # f'{pformat(entry)}' + # ) + + expiry = entry.get('expiry') + if expiry: + expiry = pendulum.parse(expiry) + + pp_objs[bsuid] = Position( + Symbol.from_fqsn(fqsn, info={}), + size=size, + be_price=entry['be_price'], + expiry=expiry, + bsuid=entry['bsuid'], + + # XXX: super critical, we need to be sure to include + # all pps.toml clears to avoid reusing clears that were + # already included in the current incremental update + # state, since today's records may have already been + # processed! + clears=clears, + ) + + yield pp_objs # TODO: instead see if we can hack tomli and tomli-w to do the same: @@ -578,6 +616,9 @@ def load_pps_from_toml( # caller to pass in a symbol set they'd like to reload from the # underlying ledger to be reprocessed in computing pps state. reload_records: Optional[dict[str, str]] = None, + + # XXX: this is "global" update from ledger flag which + # does a full refresh of pps from the available ledger. update_from_ledger: bool = False, ) -> tuple[dict, dict[str, Position]]: @@ -681,7 +722,7 @@ def update_pps_conf( brokername: str, acctid: str, - trade_records: Optional[list[Transaction]] = None, + trade_records: Optional[dict[str, Transaction]] = None, ledger_reload: Optional[dict[str, str]] = None, ) -> tuple[ @@ -701,13 +742,13 @@ def update_pps_conf( # - do diffs against updates from the ledger writer # actor and the in-mem state here? + if trade_records and ledger_reload: + for tid, r in trade_records.items(): + ledger_reload[r.bsuid] = r.fqsn + # this maps `.bsuid` values to positions pp_objs: dict[Union[str, int], Position] - if trade_records and ledger_reload: - for r in trade_records: - ledger_reload[r.bsuid] = r.fqsn - conf, pp_objs = load_pps_from_toml( brokername, acctid,