diff --git a/piker/pp.py b/piker/pp.py index 906d1f2c..dd6ba3aa 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -302,6 +302,134 @@ class Position(Struct): return self.clears +class PpTable(Struct): + + pps: dict[str, Position] + conf: Optional[dict] = {} + + def update_from_trans( + self, + trans: dict[str, Transaction], + ) -> dict[str, Position]: + + pps = self.pps + + # lifo update all pps from records + for tid, r in trans.items(): + + pp = pps.setdefault( + r.bsuid, + + # if no existing pp, allocate fresh one. + Position( + Symbol.from_fqsn( + r.fqsn, + info={}, + ), + size=0.0, + be_price=0.0, + bsuid=r.bsuid, + expiry=r.expiry, + ) + ) + + # don't do updates for ledger records we already have + # included in the current pps state. + if r.tid in pp.clears: + # NOTE: likely you'll see repeats of the same + # ``Transaction`` passed in here if/when you are restarting + # a ``brokerd.ib`` where the API will re-report trades from + # the current session, so we need to make sure we don't + # "double count" these in pp calculations. + continue + + # lifo style "breakeven" price calc + pp.lifo_update( + r.size, + r.price, + + # include transaction cost in breakeven price + # and presume the worst case of the same cost + # to exit this transaction (even though in reality + # it will be dynamic based on exit stratetgy). + cost=2*r.cost, + ) + + # track clearing data + pp.update(r) + + return pps + + def dump_active( + self, + brokername: str, + ) -> tuple[ + dict[str, Any], + dict[str, Position] + ]: + ''' + Iterate all tabulated positions, render active positions to + a ``dict`` format amenable to serialization (via TOML) and drop + from state (``.pps``) as well as return in a ``dict`` all + ``Position``s which have recently closed. + + ''' + # ONLY dict-serialize all active positions; those that are closed + # we don't store in the ``pps.toml``. + # NOTE: newly closed position are also important to report/return + # since a consumer, like an order mode UI ;), might want to react + # based on the closure. + pp_entries = {} + closed_pp_objs: dict[str, Position] = {} + + pp_objs = self.pps + for bsuid in list(pp_objs): + pp = pp_objs[bsuid] + + # XXX: debug hook for size mismatches + # if bsuid == 447767096: + # breakpoint() + + pp.minimize_clears() + + if ( + pp.size == 0 + + # drop time-expired positions (normally derivatives) + or (pp.expiry and pp.expiry < now()) + ): + # if expired the position is closed + pp.size = 0 + + # position is already closed aka "net zero" + closed_pp = pp_objs.pop(bsuid, None) + if closed_pp: + closed_pp_objs[bsuid] = closed_pp + + else: + # serialize to pre-toml form + asdict = pp.to_pretoml() + + if pp.expiry is None: + asdict.pop('expiry', None) + + # TODO: we need to figure out how to have one top level + # listing venue here even when the backend isn't providing + # it via the trades ledger.. + # drop symbol obj in serialized form + s = asdict.pop('symbol') + fqsn = s.front_fqsn() + log.info(f'Updating active pp: {fqsn}') + + # XXX: ugh, it's cuz we push the section under + # the broker name.. maybe we need to rethink this? + brokerless_key = fqsn.removeprefix(f'{brokername}.') + + pp_entries[brokerless_key] = asdict + + return pp_entries, closed_pp_objs + + def update_pps( records: dict[str, Transaction], pps: Optional[dict[str, Position]] = None @@ -312,55 +440,10 @@ def update_pps( ''' pps: dict[str, Position] = pps or {} - - # lifo update all pps from records - for tid, r in records.items(): - - pp = pps.setdefault( - r.bsuid, - - # if no existing pp, allocate fresh one. - Position( - Symbol.from_fqsn( - r.fqsn, - info={}, - ), - size=0.0, - be_price=0.0, - bsuid=r.bsuid, - expiry=r.expiry, - ) - ) - - # don't do updates for ledger records we already have - # included in the current pps state. - if r.tid in pp.clears: - # NOTE: likely you'll see repeats of the same - # ``Transaction`` passed in here if/when you are restarting - # a ``brokerd.ib`` where the API will re-report trades from - # the current session, so we need to make sure we don't - # "double count" these in pp calculations. - continue - - # lifo style "breakeven" price calc - pp.lifo_update( - r.size, - r.price, - - # include transaction cost in breakeven price - # and presume the worst case of the same cost - # to exit this transaction (even though in reality - # it will be dynamic based on exit stratetgy). - cost=2*r.cost, - ) - - # track clearing data - pp.update(r) - - return pps + return PpTable(pps).update_from_trans(records) -def load_pps_from_ledger( +def load_trans_from_ledger( brokername: str, acctname: str, @@ -396,82 +479,7 @@ def load_pps_from_ledger( else: records = src_records - return update_pps(records) - - -@cm -def open_pps( - brokername: str, - acctid: str, - -) -> dict[str, dict[str, Position]]: - ''' - Read out broker-specific position entries from - incremental update file: ``pps.toml``. - - ''' - conf, path = config.load('pps') - brokersection = conf.setdefault(brokername, {}) - pps = brokersection.setdefault(acctid, {}) - pp_objs = {} - - # unmarshal/load ``pps.toml`` config entries into object form. - for fqsn, entry in pps.items(): - bsuid = entry['bsuid'] - - # convert clears sub-tables (only in this form - # for toml re-presentation) back into a master table. - clears_list = entry['clears'] - - # index clears entries in "object" form by tid in a top - # level dict instead of a list (as is presented in our - # ``pps.toml``). - pp = pp_objs.get(bsuid) - if pp: - clears = pp.clears - else: - clears = {} - - for clears_table in clears_list: - tid = clears_table.pop('tid') - clears[tid] = clears_table - - size = entry['size'] - - # TODO: an audit system for existing pps entries? - # if not len(clears) == abs(size): - # pp_objs = load_pps_from_ledger( - # brokername, - # acctid, - # filter_by=reload_records, - # ) - # reason = 'size <-> len(clears) mismatch' - # raise ValueError( - # '`pps.toml` entry is invalid:\n' - # f'{fqsn}\n' - # f'{pformat(entry)}' - # ) - - expiry = entry.get('expiry') - if expiry: - expiry = pendulum.parse(expiry) - - pp_objs[bsuid] = Position( - Symbol.from_fqsn(fqsn, info={}), - size=size, - be_price=entry['be_price'], - expiry=expiry, - bsuid=entry['bsuid'], - - # XXX: super critical, we need to be sure to include - # all pps.toml clears to avoid reusing clears that were - # already included in the current incremental update - # state, since today's records may have already been - # processed! - clears=clears, - ) - - yield pp_objs + return records # TODO: instead see if we can hack tomli and tomli-w to do the same: @@ -627,39 +635,63 @@ def load_pps_from_toml( ``pps.toml`` config, or from scratch from a ledger file when none yet exists. + ''' + with open_pps(brokername, acctid) as table: + pp_objs = table.pps + + # no pps entry yet for this broker/account so parse any available + # ledgers to build a brand new pps state. + if not pp_objs or update_from_ledger: + trans = load_trans_from_ledger( + brokername, + acctid, + ) + # TODO: just call `.update_from_trans()`? + ledger_pp_objs = update_pps(trans) + pp_objs.update(ledger_pp_objs) + + # Reload symbol specific ledger entries if requested by the + # caller **AND** none exist in the current pps state table. + elif ( + pp_objs and reload_records + ): + # no pps entry yet for this broker/account so parse + # any available ledgers to build a pps state. + trans = load_trans_from_ledger( + brokername, + acctid, + filter_by=reload_records, + ) + ledger_pp_objs = update_pps(trans) + pp_objs.update(ledger_pp_objs) + + if not pp_objs: + log.warning( + f'No `pps.toml` values could be loaded {brokername}:{acctid}' + ) + + return table, table.conf, table.pps + + +@cm +def open_pps( + brokername: str, + acctid: str, + +) -> dict[str, dict[str, Position]]: + ''' + Read out broker-specific position entries from + incremental update file: ``pps.toml``. + ''' conf, path = config.load('pps') brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) pp_objs = {} + table = PpTable(pp_objs, conf=conf) - # no pps entry yet for this broker/account so parse any available - # ledgers to build a brand new pps state. - if not pps or update_from_ledger: - pp_objs = load_pps_from_ledger( - brokername, - acctid, - ) - - # Reload symbol specific ledger entries if requested by the - # caller **AND** none exist in the current pps state table. - elif ( - pps and reload_records - ): - # no pps entry yet for this broker/account so parse - # any available ledgers to build a pps state. - pp_objs = load_pps_from_ledger( - brokername, - acctid, - filter_by=reload_records, - ) - - if not pps: - log.warning( - f'No `pps.toml` positions could be loaded {brokername}:{acctid}' - ) - - # unmarshal/load ``pps.toml`` config entries into object form. + # unmarshal/load ``pps.toml`` config entries into object form + # and update `PpTable` obj entries. for fqsn, entry in pps.items(): bsuid = entry['bsuid'] @@ -715,7 +747,32 @@ def load_pps_from_toml( clears=clears, ) - return conf, pp_objs + orig = pp_objs.copy() + try: + yield table + finally: + if orig != pp_objs: + + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ``pps.toml`` for {path}:\n') + + pp_entries, closed_pp_objs = table.dump_active(brokername) + conf[brokername][acctid] = pp_entries + + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table + + config.write( + conf, + 'pps', + encoder=enc, + ) def update_pps_conf( @@ -749,7 +806,7 @@ def update_pps_conf( # this maps `.bsuid` values to positions pp_objs: dict[Union[str, int], Position] - conf, pp_objs = load_pps_from_toml( + table, conf, pp_objs = load_pps_from_toml( brokername, acctid, reload_records=ledger_reload, @@ -758,60 +815,9 @@ def update_pps_conf( # update all pp objects from any (new) trade records which # were passed in (aka incremental update case). if trade_records: - pp_objs = update_pps( - trade_records, - pps=pp_objs, - ) + table.update_from_trans(trade_records) - pp_entries = {} # dict-serialize all active pps - # NOTE: newly closed position are also important to report/return - # since a consumer, like an order mode UI ;), might want to react - # based on the closure. - closed_pp_objs: dict[str, Position] = {} - - for bsuid in list(pp_objs): - pp = pp_objs[bsuid] - - # XXX: debug hook for size mismatches - # if bsuid == 447767096: - # breakpoint() - - pp.minimize_clears() - - if ( - pp.size == 0 - - # drop time-expired positions (normally derivatives) - or (pp.expiry and pp.expiry < now()) - ): - # if expired the position is closed - pp.size = 0 - - # position is already closed aka "net zero" - closed_pp = pp_objs.pop(bsuid, None) - if closed_pp: - closed_pp_objs[bsuid] = closed_pp - - else: - # serialize to pre-toml form - asdict = pp.to_pretoml() - - if pp.expiry is None: - asdict.pop('expiry', None) - - # TODO: we need to figure out how to have one top level - # listing venue here even when the backend isn't providing - # it via the trades ledger.. - # drop symbol obj in serialized form - s = asdict.pop('symbol') - fqsn = s.front_fqsn() - log.info(f'Updating active pp: {fqsn}') - - # XXX: ugh, it's cuz we push the section under - # the broker name.. maybe we need to rethink this? - brokerless_key = fqsn.removeprefix(f'{brokername}.') - - pp_entries[brokerless_key] = asdict + pp_entries, closed_pp_objs = table.dump_active(brokername) conf[brokername][acctid] = pp_entries