From 45871d5846df441684af1cb4a3ea6f526f05b4c1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Jul 2022 20:00:12 -0400 Subject: [PATCH 1/6] Freeze transactions, add todo notes for incr update --- piker/pp.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/piker/pp.py b/piker/pp.py index 88a7147d..4fe97ae6 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -96,7 +96,7 @@ def open_trade_ledger( return toml.dump(ledger, cf) -class Transaction(Struct): +class Transaction(Struct, frozen=True): # TODO: should this be ``.to`` (see below)? fqsn: str @@ -688,6 +688,18 @@ def update_pps_conf( dict[str, Position], dict[str, Position], ]: + # TODO: ideally we can pass in an existing + # pps state to this right? such that we + # don't have to do a ledger reload all the + # time.. a couple ideas I can think of, + # - load pps once after backend ledger state + # is loaded and keep maintainend in memory + # inside a with block, + # - mirror this in some client side actor which + # does the actual ledger updates (say the paper + # engine proc if we decide to always spawn it?), + # - do diffs against updates from the ledger writer + # actor and the in-mem state here? # this maps `.bsuid` values to positions pp_objs: dict[Union[str, int], Position] From 09d9a7ea2bff7c8e7cbda72ef314be93e99a227c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 18 Jul 2022 10:33:56 -0400 Subject: [PATCH 2/6] Expect `.norm_trade_records()` to return `dict` --- piker/pp.py | 107 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 33 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 4fe97ae6..906d1f2c 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -314,7 +314,7 @@ def update_pps( pps: dict[str, Position] = pps or {} # lifo update all pps from records - for r in records: + for tid, r in records.items(): pp = pps.setdefault( r.bsuid, @@ -385,20 +385,24 @@ def load_pps_from_ledger( return {} brokermod = get_brokermod(brokername) - src_records = brokermod.norm_trade_records(ledger) + src_records: dict[str, Transaction] = brokermod.norm_trade_records(ledger) if filter_by: + records = {} bsuids = set(filter_by) - records = list(filter(lambda r: r.bsuid in bsuids, src_records)) + for tid, r in src_records.items(): + if r.bsuid in bsuids: + records[tid] = r else: records = src_records return update_pps(records) -def get_pps( +@cm +def open_pps( brokername: str, - acctids: Optional[set[str]] = set(), + acctid: str, ) -> dict[str, dict[str, Position]]: ''' @@ -406,34 +410,68 @@ def get_pps( incremental update file: ``pps.toml``. ''' - conf, path = config.load( - 'pps', - # load dicts as inlines to preserve compactness - # _dict=toml.decoder.InlineTableDict, - ) + conf, path = config.load('pps') + brokersection = conf.setdefault(brokername, {}) + pps = brokersection.setdefault(acctid, {}) + pp_objs = {} - all_active = {} - all_closed = {} + # unmarshal/load ``pps.toml`` config entries into object form. + for fqsn, entry in pps.items(): + bsuid = entry['bsuid'] - # try to load any ledgers if no section found - bconf, path = config.load('brokers') - accounts = bconf[brokername]['accounts'] - for account in accounts: + # convert clears sub-tables (only in this form + # for toml re-presentation) back into a master table. + clears_list = entry['clears'] - # TODO: instead of this filter we could - # always send all known pps but just not audit - # them since an active client might not be up? - if ( - acctids and - f'{brokername}.{account}' not in acctids - ): - continue + # index clears entries in "object" form by tid in a top + # level dict instead of a list (as is presented in our + # ``pps.toml``). + pp = pp_objs.get(bsuid) + if pp: + clears = pp.clears + else: + clears = {} - active, closed = update_pps_conf(brokername, account) - all_active.setdefault(account, {}).update(active) - all_closed.setdefault(account, {}).update(closed) + for clears_table in clears_list: + tid = clears_table.pop('tid') + clears[tid] = clears_table - return all_active, all_closed + size = entry['size'] + + # TODO: an audit system for existing pps entries? + # if not len(clears) == abs(size): + # pp_objs = load_pps_from_ledger( + # brokername, + # acctid, + # filter_by=reload_records, + # ) + # reason = 'size <-> len(clears) mismatch' + # raise ValueError( + # '`pps.toml` entry is invalid:\n' + # f'{fqsn}\n' + # f'{pformat(entry)}' + # ) + + expiry = entry.get('expiry') + if expiry: + expiry = pendulum.parse(expiry) + + pp_objs[bsuid] = Position( + Symbol.from_fqsn(fqsn, info={}), + size=size, + be_price=entry['be_price'], + expiry=expiry, + bsuid=entry['bsuid'], + + # XXX: super critical, we need to be sure to include + # all pps.toml clears to avoid reusing clears that were + # already included in the current incremental update + # state, since today's records may have already been + # processed! + clears=clears, + ) + + yield pp_objs # TODO: instead see if we can hack tomli and tomli-w to do the same: @@ -578,6 +616,9 @@ def load_pps_from_toml( # caller to pass in a symbol set they'd like to reload from the # underlying ledger to be reprocessed in computing pps state. reload_records: Optional[dict[str, str]] = None, + + # XXX: this is "global" update from ledger flag which + # does a full refresh of pps from the available ledger. update_from_ledger: bool = False, ) -> tuple[dict, dict[str, Position]]: @@ -681,7 +722,7 @@ def update_pps_conf( brokername: str, acctid: str, - trade_records: Optional[list[Transaction]] = None, + trade_records: Optional[dict[str, Transaction]] = None, ledger_reload: Optional[dict[str, str]] = None, ) -> tuple[ @@ -701,13 +742,13 @@ def update_pps_conf( # - do diffs against updates from the ledger writer # actor and the in-mem state here? + if trade_records and ledger_reload: + for tid, r in trade_records.items(): + ledger_reload[r.bsuid] = r.fqsn + # this maps `.bsuid` values to positions pp_objs: dict[Union[str, int], Position] - if trade_records and ledger_reload: - for r in trade_records: - ledger_reload[r.bsuid] = r.fqsn - conf, pp_objs = load_pps_from_toml( brokername, acctid, From 9326379b042a7cbb5d70d57a7b0a3b0b041e7615 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 18 Jul 2022 12:23:02 -0400 Subject: [PATCH 3/6] Add a `PpTable` type, give it the update methods In an effort to begin allowing backends to have more granular control over position updates, particular in the case where they need to be reloaded from a trades ledger, this adds a new table API which can be loaded using `open_pps()`. - offer an `.update_trans()` method which takes in a `dict` of `Transactions` and updates the current table of `Positions` from it. - add a `.dump_active()` which renders the active pp entries dict in a format ready for toml serialization and all closed positions since the last update (we might want to not drop these?) All other module-function apis currently in use should remain working as before for the moment. --- piker/pp.py | 416 ++++++++++++++++++++++++++-------------------------- 1 file changed, 211 insertions(+), 205 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 906d1f2c..dd6ba3aa 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -302,6 +302,134 @@ class Position(Struct): return self.clears +class PpTable(Struct): + + pps: dict[str, Position] + conf: Optional[dict] = {} + + def update_from_trans( + self, + trans: dict[str, Transaction], + ) -> dict[str, Position]: + + pps = self.pps + + # lifo update all pps from records + for tid, r in trans.items(): + + pp = pps.setdefault( + r.bsuid, + + # if no existing pp, allocate fresh one. + Position( + Symbol.from_fqsn( + r.fqsn, + info={}, + ), + size=0.0, + be_price=0.0, + bsuid=r.bsuid, + expiry=r.expiry, + ) + ) + + # don't do updates for ledger records we already have + # included in the current pps state. + if r.tid in pp.clears: + # NOTE: likely you'll see repeats of the same + # ``Transaction`` passed in here if/when you are restarting + # a ``brokerd.ib`` where the API will re-report trades from + # the current session, so we need to make sure we don't + # "double count" these in pp calculations. + continue + + # lifo style "breakeven" price calc + pp.lifo_update( + r.size, + r.price, + + # include transaction cost in breakeven price + # and presume the worst case of the same cost + # to exit this transaction (even though in reality + # it will be dynamic based on exit stratetgy). + cost=2*r.cost, + ) + + # track clearing data + pp.update(r) + + return pps + + def dump_active( + self, + brokername: str, + ) -> tuple[ + dict[str, Any], + dict[str, Position] + ]: + ''' + Iterate all tabulated positions, render active positions to + a ``dict`` format amenable to serialization (via TOML) and drop + from state (``.pps``) as well as return in a ``dict`` all + ``Position``s which have recently closed. + + ''' + # ONLY dict-serialize all active positions; those that are closed + # we don't store in the ``pps.toml``. + # NOTE: newly closed position are also important to report/return + # since a consumer, like an order mode UI ;), might want to react + # based on the closure. + pp_entries = {} + closed_pp_objs: dict[str, Position] = {} + + pp_objs = self.pps + for bsuid in list(pp_objs): + pp = pp_objs[bsuid] + + # XXX: debug hook for size mismatches + # if bsuid == 447767096: + # breakpoint() + + pp.minimize_clears() + + if ( + pp.size == 0 + + # drop time-expired positions (normally derivatives) + or (pp.expiry and pp.expiry < now()) + ): + # if expired the position is closed + pp.size = 0 + + # position is already closed aka "net zero" + closed_pp = pp_objs.pop(bsuid, None) + if closed_pp: + closed_pp_objs[bsuid] = closed_pp + + else: + # serialize to pre-toml form + asdict = pp.to_pretoml() + + if pp.expiry is None: + asdict.pop('expiry', None) + + # TODO: we need to figure out how to have one top level + # listing venue here even when the backend isn't providing + # it via the trades ledger.. + # drop symbol obj in serialized form + s = asdict.pop('symbol') + fqsn = s.front_fqsn() + log.info(f'Updating active pp: {fqsn}') + + # XXX: ugh, it's cuz we push the section under + # the broker name.. maybe we need to rethink this? + brokerless_key = fqsn.removeprefix(f'{brokername}.') + + pp_entries[brokerless_key] = asdict + + return pp_entries, closed_pp_objs + + def update_pps( records: dict[str, Transaction], pps: Optional[dict[str, Position]] = None @@ -312,55 +440,10 @@ def update_pps( ''' pps: dict[str, Position] = pps or {} - - # lifo update all pps from records - for tid, r in records.items(): - - pp = pps.setdefault( - r.bsuid, - - # if no existing pp, allocate fresh one. - Position( - Symbol.from_fqsn( - r.fqsn, - info={}, - ), - size=0.0, - be_price=0.0, - bsuid=r.bsuid, - expiry=r.expiry, - ) - ) - - # don't do updates for ledger records we already have - # included in the current pps state. - if r.tid in pp.clears: - # NOTE: likely you'll see repeats of the same - # ``Transaction`` passed in here if/when you are restarting - # a ``brokerd.ib`` where the API will re-report trades from - # the current session, so we need to make sure we don't - # "double count" these in pp calculations. - continue - - # lifo style "breakeven" price calc - pp.lifo_update( - r.size, - r.price, - - # include transaction cost in breakeven price - # and presume the worst case of the same cost - # to exit this transaction (even though in reality - # it will be dynamic based on exit stratetgy). - cost=2*r.cost, - ) - - # track clearing data - pp.update(r) - - return pps + return PpTable(pps).update_from_trans(records) -def load_pps_from_ledger( +def load_trans_from_ledger( brokername: str, acctname: str, @@ -396,82 +479,7 @@ def load_pps_from_ledger( else: records = src_records - return update_pps(records) - - -@cm -def open_pps( - brokername: str, - acctid: str, - -) -> dict[str, dict[str, Position]]: - ''' - Read out broker-specific position entries from - incremental update file: ``pps.toml``. - - ''' - conf, path = config.load('pps') - brokersection = conf.setdefault(brokername, {}) - pps = brokersection.setdefault(acctid, {}) - pp_objs = {} - - # unmarshal/load ``pps.toml`` config entries into object form. - for fqsn, entry in pps.items(): - bsuid = entry['bsuid'] - - # convert clears sub-tables (only in this form - # for toml re-presentation) back into a master table. - clears_list = entry['clears'] - - # index clears entries in "object" form by tid in a top - # level dict instead of a list (as is presented in our - # ``pps.toml``). - pp = pp_objs.get(bsuid) - if pp: - clears = pp.clears - else: - clears = {} - - for clears_table in clears_list: - tid = clears_table.pop('tid') - clears[tid] = clears_table - - size = entry['size'] - - # TODO: an audit system for existing pps entries? - # if not len(clears) == abs(size): - # pp_objs = load_pps_from_ledger( - # brokername, - # acctid, - # filter_by=reload_records, - # ) - # reason = 'size <-> len(clears) mismatch' - # raise ValueError( - # '`pps.toml` entry is invalid:\n' - # f'{fqsn}\n' - # f'{pformat(entry)}' - # ) - - expiry = entry.get('expiry') - if expiry: - expiry = pendulum.parse(expiry) - - pp_objs[bsuid] = Position( - Symbol.from_fqsn(fqsn, info={}), - size=size, - be_price=entry['be_price'], - expiry=expiry, - bsuid=entry['bsuid'], - - # XXX: super critical, we need to be sure to include - # all pps.toml clears to avoid reusing clears that were - # already included in the current incremental update - # state, since today's records may have already been - # processed! - clears=clears, - ) - - yield pp_objs + return records # TODO: instead see if we can hack tomli and tomli-w to do the same: @@ -627,39 +635,63 @@ def load_pps_from_toml( ``pps.toml`` config, or from scratch from a ledger file when none yet exists. + ''' + with open_pps(brokername, acctid) as table: + pp_objs = table.pps + + # no pps entry yet for this broker/account so parse any available + # ledgers to build a brand new pps state. + if not pp_objs or update_from_ledger: + trans = load_trans_from_ledger( + brokername, + acctid, + ) + # TODO: just call `.update_from_trans()`? + ledger_pp_objs = update_pps(trans) + pp_objs.update(ledger_pp_objs) + + # Reload symbol specific ledger entries if requested by the + # caller **AND** none exist in the current pps state table. + elif ( + pp_objs and reload_records + ): + # no pps entry yet for this broker/account so parse + # any available ledgers to build a pps state. + trans = load_trans_from_ledger( + brokername, + acctid, + filter_by=reload_records, + ) + ledger_pp_objs = update_pps(trans) + pp_objs.update(ledger_pp_objs) + + if not pp_objs: + log.warning( + f'No `pps.toml` values could be loaded {brokername}:{acctid}' + ) + + return table, table.conf, table.pps + + +@cm +def open_pps( + brokername: str, + acctid: str, + +) -> dict[str, dict[str, Position]]: + ''' + Read out broker-specific position entries from + incremental update file: ``pps.toml``. + ''' conf, path = config.load('pps') brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) pp_objs = {} + table = PpTable(pp_objs, conf=conf) - # no pps entry yet for this broker/account so parse any available - # ledgers to build a brand new pps state. - if not pps or update_from_ledger: - pp_objs = load_pps_from_ledger( - brokername, - acctid, - ) - - # Reload symbol specific ledger entries if requested by the - # caller **AND** none exist in the current pps state table. - elif ( - pps and reload_records - ): - # no pps entry yet for this broker/account so parse - # any available ledgers to build a pps state. - pp_objs = load_pps_from_ledger( - brokername, - acctid, - filter_by=reload_records, - ) - - if not pps: - log.warning( - f'No `pps.toml` positions could be loaded {brokername}:{acctid}' - ) - - # unmarshal/load ``pps.toml`` config entries into object form. + # unmarshal/load ``pps.toml`` config entries into object form + # and update `PpTable` obj entries. for fqsn, entry in pps.items(): bsuid = entry['bsuid'] @@ -715,7 +747,32 @@ def load_pps_from_toml( clears=clears, ) - return conf, pp_objs + orig = pp_objs.copy() + try: + yield table + finally: + if orig != pp_objs: + + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ``pps.toml`` for {path}:\n') + + pp_entries, closed_pp_objs = table.dump_active(brokername) + conf[brokername][acctid] = pp_entries + + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table + + config.write( + conf, + 'pps', + encoder=enc, + ) def update_pps_conf( @@ -749,7 +806,7 @@ def update_pps_conf( # this maps `.bsuid` values to positions pp_objs: dict[Union[str, int], Position] - conf, pp_objs = load_pps_from_toml( + table, conf, pp_objs = load_pps_from_toml( brokername, acctid, reload_records=ledger_reload, @@ -758,60 +815,9 @@ def update_pps_conf( # update all pp objects from any (new) trade records which # were passed in (aka incremental update case). if trade_records: - pp_objs = update_pps( - trade_records, - pps=pp_objs, - ) + table.update_from_trans(trade_records) - pp_entries = {} # dict-serialize all active pps - # NOTE: newly closed position are also important to report/return - # since a consumer, like an order mode UI ;), might want to react - # based on the closure. - closed_pp_objs: dict[str, Position] = {} - - for bsuid in list(pp_objs): - pp = pp_objs[bsuid] - - # XXX: debug hook for size mismatches - # if bsuid == 447767096: - # breakpoint() - - pp.minimize_clears() - - if ( - pp.size == 0 - - # drop time-expired positions (normally derivatives) - or (pp.expiry and pp.expiry < now()) - ): - # if expired the position is closed - pp.size = 0 - - # position is already closed aka "net zero" - closed_pp = pp_objs.pop(bsuid, None) - if closed_pp: - closed_pp_objs[bsuid] = closed_pp - - else: - # serialize to pre-toml form - asdict = pp.to_pretoml() - - if pp.expiry is None: - asdict.pop('expiry', None) - - # TODO: we need to figure out how to have one top level - # listing venue here even when the backend isn't providing - # it via the trades ledger.. - # drop symbol obj in serialized form - s = asdict.pop('symbol') - fqsn = s.front_fqsn() - log.info(f'Updating active pp: {fqsn}') - - # XXX: ugh, it's cuz we push the section under - # the broker name.. maybe we need to rethink this? - brokerless_key = fqsn.removeprefix(f'{brokername}.') - - pp_entries[brokerless_key] = asdict + pp_entries, closed_pp_objs = table.dump_active(brokername) conf[brokername][acctid] = pp_entries From 674783167710c1afdbc8892324731dc791d9f5ca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Jul 2022 08:26:28 -0400 Subject: [PATCH 4/6] Don't pop zero pps from table in `.dump_active()` In order to avoid double transaction adds/updates and too-early-discard of zero sized pps (like when trades are loaded from a backend broker but were already added to a ledger or `pps.toml` prior) we now **don't** pop such `Position` entries from the `.pps` table in order to keep each position's clears table always in place. This avoids the edge case where an entry was removed too early (due to zero size) but then duplicate trade entries that were in that entrie's clears show up from the backend and are entered into a new entry resulting in an incorrect size in a new entry..We still only push non-net-zero entries to the `pps.toml`. More fixes: - return the updated set of `Positions` from `.lifo_update()`. - return the full table set from `update_pps()`. - use `PpTable.update_from_trans()` more throughout. - always write the `pps.toml` on `open_pps()` exit. - only return table from `load_pps_from_toml()`. --- piker/pp.py | 87 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index dd6ba3aa..55ae924f 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -34,7 +34,6 @@ from typing import ( Union, ) -from msgspec import Struct import pendulum from pendulum import datetime, now import tomli @@ -45,6 +44,7 @@ from .brokers import get_brokermod from .clearing._messages import BrokerdPosition, Status from .data._source import Symbol from .log import get_logger +from .data.types import Struct log = get_logger(__name__) @@ -314,6 +314,8 @@ class PpTable(Struct): pps = self.pps + updated: dict[str, Position] = {} + # lifo update all pps from records for tid, r in trans.items(): @@ -358,7 +360,9 @@ class PpTable(Struct): # track clearing data pp.update(r) - return pps + updated[r.bsuid] = pp + + return updated def dump_active( self, @@ -393,16 +397,23 @@ class PpTable(Struct): pp.minimize_clears() if ( + # "net-zero" is a "closed" position pp.size == 0 - # drop time-expired positions (normally derivatives) + # time-expired pps (normally derivatives) are "closed" or (pp.expiry and pp.expiry < now()) ): - # if expired the position is closed + # for expired cases pp.size = 0 - # position is already closed aka "net zero" - closed_pp = pp_objs.pop(bsuid, None) + # NOTE: we DO NOT pop the pp here since it can still be + # used to check for duplicate clears that may come in as + # new transaction from some backend API and need to be + # ignored; the closed positions won't be written to the + # ``pps.toml`` since ``pp_entries`` above is what's + # written. + # closed_pp = pp_objs.pop(bsuid, None) + closed_pp = pp_objs.get(bsuid) if closed_pp: closed_pp_objs[bsuid] = closed_pp @@ -440,7 +451,9 @@ def update_pps( ''' pps: dict[str, Position] = pps or {} - return PpTable(pps).update_from_trans(records) + table = PpTable(pps) + table.update_from_trans(records) + return table.pps def load_trans_from_ledger( @@ -629,7 +642,7 @@ def load_pps_from_toml( # does a full refresh of pps from the available ledger. update_from_ledger: bool = False, -) -> tuple[dict, dict[str, Position]]: +) -> tuple[PpTable, dict[str, str]]: ''' Load and marshal to objects all pps from either an existing ``pps.toml`` config, or from scratch from a ledger file when @@ -646,9 +659,7 @@ def load_pps_from_toml( brokername, acctid, ) - # TODO: just call `.update_from_trans()`? - ledger_pp_objs = update_pps(trans) - pp_objs.update(ledger_pp_objs) + table.update_from_trans(trans) # Reload symbol specific ledger entries if requested by the # caller **AND** none exist in the current pps state table. @@ -662,15 +673,14 @@ def load_pps_from_toml( acctid, filter_by=reload_records, ) - ledger_pp_objs = update_pps(trans) - pp_objs.update(ledger_pp_objs) + table.update_from_trans(trans) - if not pp_objs: + if not table.pps: log.warning( f'No `pps.toml` values could be loaded {brokername}:{acctid}' ) - return table, table.conf, table.pps + return table, table.conf @cm @@ -687,6 +697,7 @@ def open_pps( conf, path = config.load('pps') brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) + pp_objs = {} table = PpTable(pp_objs, conf=conf) @@ -747,32 +758,33 @@ def open_pps( clears=clears, ) - orig = pp_objs.copy() + # orig = pp_objs.copy() try: yield table finally: - if orig != pp_objs: + # breakpoint() + # if orig != table.pps: - # TODO: show diff output? - # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - print(f'Updating ``pps.toml`` for {path}:\n') + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ``pps.toml`` for {path}:\n') - pp_entries, closed_pp_objs = table.dump_active(brokername) - conf[brokername][acctid] = pp_entries + pp_entries, closed_pp_objs = table.dump_active(brokername) + conf[brokername][acctid] = pp_entries - # TODO: why tf haven't they already done this for inline - # tables smh.. - enc = PpsEncoder(preserve=True) - # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) - enc.dump_funcs[ - toml.decoder.InlineTableDict - ] = enc.dump_inline_table + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table - config.write( - conf, - 'pps', - encoder=enc, - ) + config.write( + conf, + 'pps', + encoder=enc, + ) def update_pps_conf( @@ -803,10 +815,7 @@ def update_pps_conf( for tid, r in trade_records.items(): ledger_reload[r.bsuid] = r.fqsn - # this maps `.bsuid` values to positions - pp_objs: dict[Union[str, int], Position] - - table, conf, pp_objs = load_pps_from_toml( + table, conf = load_pps_from_toml( brokername, acctid, reload_records=ledger_reload, @@ -817,7 +826,9 @@ def update_pps_conf( if trade_records: table.update_from_trans(trade_records) + # this maps `.bsuid` values to positions pp_entries, closed_pp_objs = table.dump_active(brokername) + pp_objs: dict[Union[str, int], Position] = table.pps conf[brokername][acctid] = pp_entries From ddb195ed2cbb3e6b305cb190a3d56efe8125c247 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 21 Jul 2022 10:12:51 -0400 Subject: [PATCH 5/6] Add a flag to prevent writing `pps.toml` on exit --- piker/pp.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 55ae924f..ece6e355 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -391,7 +391,8 @@ class PpTable(Struct): pp = pp_objs[bsuid] # XXX: debug hook for size mismatches - # if bsuid == 447767096: + # qqqbsuid = 320227571 + # if bsuid == qqqbsuid: # breakpoint() pp.minimize_clears() @@ -649,7 +650,11 @@ def load_pps_from_toml( none yet exists. ''' - with open_pps(brokername, acctid) as table: + with open_pps( + brokername, + acctid, + write_on_exit=False, + ) as table: pp_objs = table.pps # no pps entry yet for this broker/account so parse any available @@ -687,8 +692,9 @@ def load_pps_from_toml( def open_pps( brokername: str, acctid: str, + write_on_exit: bool = True, -) -> dict[str, dict[str, Position]]: +) -> PpTable: ''' Read out broker-specific position entries from incremental update file: ``pps.toml``. @@ -765,6 +771,9 @@ def open_pps( # breakpoint() # if orig != table.pps: + if not write_on_exit: + return + # TODO: show diff output? # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries print(f'Updating ``pps.toml`` for {path}:\n') From 5684120c11da72d058c75e6fe07d3e72d8717c73 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 21 Jul 2022 15:28:04 -0400 Subject: [PATCH 6/6] Wow, drop idiotic `return` inside `finally:` Can't believe i missed this but any `return` inside a `finally` will suppress the error from the `try:` part... XD Thought i was losing my mind when the ledger was mutated and then an error just after wasn't getting raised.. lul. Never again... --- piker/pp.py | 67 +++++++++++++++++++++++++---------------------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index ece6e355..42036ead 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -82,18 +82,18 @@ def open_trade_ledger( ledger = tomli.load(cf) print(f'Ledger load took {time.time() - start}s') cpy = ledger.copy() - try: - yield cpy - finally: - if cpy != ledger: - # TODO: show diff output? - # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - print(f'Updating ledger for {tradesfile}:\n') - ledger.update(cpy) - # we write on close the mutated ledger data - with open(tradesfile, 'w') as cf: - return toml.dump(ledger, cf) + yield cpy + + if cpy != ledger: + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ledger for {tradesfile}:\n') + ledger.update(cpy) + + # we write on close the mutated ledger data + with open(tradesfile, 'w') as cf: + toml.dump(ledger, cf) class Transaction(Struct, frozen=True): @@ -764,36 +764,31 @@ def open_pps( clears=clears, ) - # orig = pp_objs.copy() - try: - yield table - finally: - # breakpoint() - # if orig != table.pps: + yield table - if not write_on_exit: - return + if not write_on_exit: + return - # TODO: show diff output? - # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - print(f'Updating ``pps.toml`` for {path}:\n') + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ``pps.toml`` for {path}:\n') - pp_entries, closed_pp_objs = table.dump_active(brokername) - conf[brokername][acctid] = pp_entries + pp_entries, closed_pp_objs = table.dump_active(brokername) + conf[brokername][acctid] = pp_entries - # TODO: why tf haven't they already done this for inline - # tables smh.. - enc = PpsEncoder(preserve=True) - # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) - enc.dump_funcs[ - toml.decoder.InlineTableDict - ] = enc.dump_inline_table + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table - config.write( - conf, - 'pps', - encoder=enc, - ) + config.write( + conf, + 'pps', + encoder=enc, + ) def update_pps_conf(