From 9326379b042a7cbb5d70d57a7b0a3b0b041e7615 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 18 Jul 2022 12:23:02 -0400 Subject: [PATCH] Add a `PpTable` type, give it the update methods In an effort to begin allowing backends to have more granular control over position updates, particular in the case where they need to be reloaded from a trades ledger, this adds a new table API which can be loaded using `open_pps()`. - offer an `.update_trans()` method which takes in a `dict` of `Transactions` and updates the current table of `Positions` from it. - add a `.dump_active()` which renders the active pp entries dict in a format ready for toml serialization and all closed positions since the last update (we might want to not drop these?) All other module-function apis currently in use should remain working as before for the moment. --- piker/pp.py | 416 ++++++++++++++++++++++++++-------------------------- 1 file changed, 211 insertions(+), 205 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 906d1f2c..dd6ba3aa 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -302,6 +302,134 @@ class Position(Struct): return self.clears +class PpTable(Struct): + + pps: dict[str, Position] + conf: Optional[dict] = {} + + def update_from_trans( + self, + trans: dict[str, Transaction], + ) -> dict[str, Position]: + + pps = self.pps + + # lifo update all pps from records + for tid, r in trans.items(): + + pp = pps.setdefault( + r.bsuid, + + # if no existing pp, allocate fresh one. + Position( + Symbol.from_fqsn( + r.fqsn, + info={}, + ), + size=0.0, + be_price=0.0, + bsuid=r.bsuid, + expiry=r.expiry, + ) + ) + + # don't do updates for ledger records we already have + # included in the current pps state. + if r.tid in pp.clears: + # NOTE: likely you'll see repeats of the same + # ``Transaction`` passed in here if/when you are restarting + # a ``brokerd.ib`` where the API will re-report trades from + # the current session, so we need to make sure we don't + # "double count" these in pp calculations. + continue + + # lifo style "breakeven" price calc + pp.lifo_update( + r.size, + r.price, + + # include transaction cost in breakeven price + # and presume the worst case of the same cost + # to exit this transaction (even though in reality + # it will be dynamic based on exit stratetgy). + cost=2*r.cost, + ) + + # track clearing data + pp.update(r) + + return pps + + def dump_active( + self, + brokername: str, + ) -> tuple[ + dict[str, Any], + dict[str, Position] + ]: + ''' + Iterate all tabulated positions, render active positions to + a ``dict`` format amenable to serialization (via TOML) and drop + from state (``.pps``) as well as return in a ``dict`` all + ``Position``s which have recently closed. + + ''' + # ONLY dict-serialize all active positions; those that are closed + # we don't store in the ``pps.toml``. + # NOTE: newly closed position are also important to report/return + # since a consumer, like an order mode UI ;), might want to react + # based on the closure. + pp_entries = {} + closed_pp_objs: dict[str, Position] = {} + + pp_objs = self.pps + for bsuid in list(pp_objs): + pp = pp_objs[bsuid] + + # XXX: debug hook for size mismatches + # if bsuid == 447767096: + # breakpoint() + + pp.minimize_clears() + + if ( + pp.size == 0 + + # drop time-expired positions (normally derivatives) + or (pp.expiry and pp.expiry < now()) + ): + # if expired the position is closed + pp.size = 0 + + # position is already closed aka "net zero" + closed_pp = pp_objs.pop(bsuid, None) + if closed_pp: + closed_pp_objs[bsuid] = closed_pp + + else: + # serialize to pre-toml form + asdict = pp.to_pretoml() + + if pp.expiry is None: + asdict.pop('expiry', None) + + # TODO: we need to figure out how to have one top level + # listing venue here even when the backend isn't providing + # it via the trades ledger.. + # drop symbol obj in serialized form + s = asdict.pop('symbol') + fqsn = s.front_fqsn() + log.info(f'Updating active pp: {fqsn}') + + # XXX: ugh, it's cuz we push the section under + # the broker name.. maybe we need to rethink this? + brokerless_key = fqsn.removeprefix(f'{brokername}.') + + pp_entries[brokerless_key] = asdict + + return pp_entries, closed_pp_objs + + def update_pps( records: dict[str, Transaction], pps: Optional[dict[str, Position]] = None @@ -312,55 +440,10 @@ def update_pps( ''' pps: dict[str, Position] = pps or {} - - # lifo update all pps from records - for tid, r in records.items(): - - pp = pps.setdefault( - r.bsuid, - - # if no existing pp, allocate fresh one. - Position( - Symbol.from_fqsn( - r.fqsn, - info={}, - ), - size=0.0, - be_price=0.0, - bsuid=r.bsuid, - expiry=r.expiry, - ) - ) - - # don't do updates for ledger records we already have - # included in the current pps state. - if r.tid in pp.clears: - # NOTE: likely you'll see repeats of the same - # ``Transaction`` passed in here if/when you are restarting - # a ``brokerd.ib`` where the API will re-report trades from - # the current session, so we need to make sure we don't - # "double count" these in pp calculations. - continue - - # lifo style "breakeven" price calc - pp.lifo_update( - r.size, - r.price, - - # include transaction cost in breakeven price - # and presume the worst case of the same cost - # to exit this transaction (even though in reality - # it will be dynamic based on exit stratetgy). - cost=2*r.cost, - ) - - # track clearing data - pp.update(r) - - return pps + return PpTable(pps).update_from_trans(records) -def load_pps_from_ledger( +def load_trans_from_ledger( brokername: str, acctname: str, @@ -396,82 +479,7 @@ def load_pps_from_ledger( else: records = src_records - return update_pps(records) - - -@cm -def open_pps( - brokername: str, - acctid: str, - -) -> dict[str, dict[str, Position]]: - ''' - Read out broker-specific position entries from - incremental update file: ``pps.toml``. - - ''' - conf, path = config.load('pps') - brokersection = conf.setdefault(brokername, {}) - pps = brokersection.setdefault(acctid, {}) - pp_objs = {} - - # unmarshal/load ``pps.toml`` config entries into object form. - for fqsn, entry in pps.items(): - bsuid = entry['bsuid'] - - # convert clears sub-tables (only in this form - # for toml re-presentation) back into a master table. - clears_list = entry['clears'] - - # index clears entries in "object" form by tid in a top - # level dict instead of a list (as is presented in our - # ``pps.toml``). - pp = pp_objs.get(bsuid) - if pp: - clears = pp.clears - else: - clears = {} - - for clears_table in clears_list: - tid = clears_table.pop('tid') - clears[tid] = clears_table - - size = entry['size'] - - # TODO: an audit system for existing pps entries? - # if not len(clears) == abs(size): - # pp_objs = load_pps_from_ledger( - # brokername, - # acctid, - # filter_by=reload_records, - # ) - # reason = 'size <-> len(clears) mismatch' - # raise ValueError( - # '`pps.toml` entry is invalid:\n' - # f'{fqsn}\n' - # f'{pformat(entry)}' - # ) - - expiry = entry.get('expiry') - if expiry: - expiry = pendulum.parse(expiry) - - pp_objs[bsuid] = Position( - Symbol.from_fqsn(fqsn, info={}), - size=size, - be_price=entry['be_price'], - expiry=expiry, - bsuid=entry['bsuid'], - - # XXX: super critical, we need to be sure to include - # all pps.toml clears to avoid reusing clears that were - # already included in the current incremental update - # state, since today's records may have already been - # processed! - clears=clears, - ) - - yield pp_objs + return records # TODO: instead see if we can hack tomli and tomli-w to do the same: @@ -627,39 +635,63 @@ def load_pps_from_toml( ``pps.toml`` config, or from scratch from a ledger file when none yet exists. + ''' + with open_pps(brokername, acctid) as table: + pp_objs = table.pps + + # no pps entry yet for this broker/account so parse any available + # ledgers to build a brand new pps state. + if not pp_objs or update_from_ledger: + trans = load_trans_from_ledger( + brokername, + acctid, + ) + # TODO: just call `.update_from_trans()`? + ledger_pp_objs = update_pps(trans) + pp_objs.update(ledger_pp_objs) + + # Reload symbol specific ledger entries if requested by the + # caller **AND** none exist in the current pps state table. + elif ( + pp_objs and reload_records + ): + # no pps entry yet for this broker/account so parse + # any available ledgers to build a pps state. + trans = load_trans_from_ledger( + brokername, + acctid, + filter_by=reload_records, + ) + ledger_pp_objs = update_pps(trans) + pp_objs.update(ledger_pp_objs) + + if not pp_objs: + log.warning( + f'No `pps.toml` values could be loaded {brokername}:{acctid}' + ) + + return table, table.conf, table.pps + + +@cm +def open_pps( + brokername: str, + acctid: str, + +) -> dict[str, dict[str, Position]]: + ''' + Read out broker-specific position entries from + incremental update file: ``pps.toml``. + ''' conf, path = config.load('pps') brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) pp_objs = {} + table = PpTable(pp_objs, conf=conf) - # no pps entry yet for this broker/account so parse any available - # ledgers to build a brand new pps state. - if not pps or update_from_ledger: - pp_objs = load_pps_from_ledger( - brokername, - acctid, - ) - - # Reload symbol specific ledger entries if requested by the - # caller **AND** none exist in the current pps state table. - elif ( - pps and reload_records - ): - # no pps entry yet for this broker/account so parse - # any available ledgers to build a pps state. - pp_objs = load_pps_from_ledger( - brokername, - acctid, - filter_by=reload_records, - ) - - if not pps: - log.warning( - f'No `pps.toml` positions could be loaded {brokername}:{acctid}' - ) - - # unmarshal/load ``pps.toml`` config entries into object form. + # unmarshal/load ``pps.toml`` config entries into object form + # and update `PpTable` obj entries. for fqsn, entry in pps.items(): bsuid = entry['bsuid'] @@ -715,7 +747,32 @@ def load_pps_from_toml( clears=clears, ) - return conf, pp_objs + orig = pp_objs.copy() + try: + yield table + finally: + if orig != pp_objs: + + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ``pps.toml`` for {path}:\n') + + pp_entries, closed_pp_objs = table.dump_active(brokername) + conf[brokername][acctid] = pp_entries + + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table + + config.write( + conf, + 'pps', + encoder=enc, + ) def update_pps_conf( @@ -749,7 +806,7 @@ def update_pps_conf( # this maps `.bsuid` values to positions pp_objs: dict[Union[str, int], Position] - conf, pp_objs = load_pps_from_toml( + table, conf, pp_objs = load_pps_from_toml( brokername, acctid, reload_records=ledger_reload, @@ -758,60 +815,9 @@ def update_pps_conf( # update all pp objects from any (new) trade records which # were passed in (aka incremental update case). if trade_records: - pp_objs = update_pps( - trade_records, - pps=pp_objs, - ) + table.update_from_trans(trade_records) - pp_entries = {} # dict-serialize all active pps - # NOTE: newly closed position are also important to report/return - # since a consumer, like an order mode UI ;), might want to react - # based on the closure. - closed_pp_objs: dict[str, Position] = {} - - for bsuid in list(pp_objs): - pp = pp_objs[bsuid] - - # XXX: debug hook for size mismatches - # if bsuid == 447767096: - # breakpoint() - - pp.minimize_clears() - - if ( - pp.size == 0 - - # drop time-expired positions (normally derivatives) - or (pp.expiry and pp.expiry < now()) - ): - # if expired the position is closed - pp.size = 0 - - # position is already closed aka "net zero" - closed_pp = pp_objs.pop(bsuid, None) - if closed_pp: - closed_pp_objs[bsuid] = closed_pp - - else: - # serialize to pre-toml form - asdict = pp.to_pretoml() - - if pp.expiry is None: - asdict.pop('expiry', None) - - # TODO: we need to figure out how to have one top level - # listing venue here even when the backend isn't providing - # it via the trades ledger.. - # drop symbol obj in serialized form - s = asdict.pop('symbol') - fqsn = s.front_fqsn() - log.info(f'Updating active pp: {fqsn}') - - # XXX: ugh, it's cuz we push the section under - # the broker name.. maybe we need to rethink this? - brokerless_key = fqsn.removeprefix(f'{brokername}.') - - pp_entries[brokerless_key] = asdict + pp_entries, closed_pp_objs = table.dump_active(brokername) conf[brokername][acctid] = pp_entries