diff --git a/piker/pp.py b/piker/pp.py index 88a7147d..42036ead 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -34,7 +34,6 @@ from typing import ( Union, ) -from msgspec import Struct import pendulum from pendulum import datetime, now import tomli @@ -45,6 +44,7 @@ from .brokers import get_brokermod from .clearing._messages import BrokerdPosition, Status from .data._source import Symbol from .log import get_logger +from .data.types import Struct log = get_logger(__name__) @@ -82,21 +82,21 @@ def open_trade_ledger( ledger = tomli.load(cf) print(f'Ledger load took {time.time() - start}s') cpy = ledger.copy() - try: - yield cpy - finally: - if cpy != ledger: - # TODO: show diff output? - # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - print(f'Updating ledger for {tradesfile}:\n') - ledger.update(cpy) - # we write on close the mutated ledger data - with open(tradesfile, 'w') as cf: - return toml.dump(ledger, cf) + yield cpy + + if cpy != ledger: + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ledger for {tradesfile}:\n') + ledger.update(cpy) + + # we write on close the mutated ledger data + with open(tradesfile, 'w') as cf: + toml.dump(ledger, cf) -class Transaction(Struct): +class Transaction(Struct, frozen=True): # TODO: should this be ``.to`` (see below)? fqsn: str @@ -302,6 +302,146 @@ class Position(Struct): return self.clears +class PpTable(Struct): + + pps: dict[str, Position] + conf: Optional[dict] = {} + + def update_from_trans( + self, + trans: dict[str, Transaction], + ) -> dict[str, Position]: + + pps = self.pps + + updated: dict[str, Position] = {} + + # lifo update all pps from records + for tid, r in trans.items(): + + pp = pps.setdefault( + r.bsuid, + + # if no existing pp, allocate fresh one. + Position( + Symbol.from_fqsn( + r.fqsn, + info={}, + ), + size=0.0, + be_price=0.0, + bsuid=r.bsuid, + expiry=r.expiry, + ) + ) + + # don't do updates for ledger records we already have + # included in the current pps state. + if r.tid in pp.clears: + # NOTE: likely you'll see repeats of the same + # ``Transaction`` passed in here if/when you are restarting + # a ``brokerd.ib`` where the API will re-report trades from + # the current session, so we need to make sure we don't + # "double count" these in pp calculations. + continue + + # lifo style "breakeven" price calc + pp.lifo_update( + r.size, + r.price, + + # include transaction cost in breakeven price + # and presume the worst case of the same cost + # to exit this transaction (even though in reality + # it will be dynamic based on exit stratetgy). + cost=2*r.cost, + ) + + # track clearing data + pp.update(r) + + updated[r.bsuid] = pp + + return updated + + def dump_active( + self, + brokername: str, + ) -> tuple[ + dict[str, Any], + dict[str, Position] + ]: + ''' + Iterate all tabulated positions, render active positions to + a ``dict`` format amenable to serialization (via TOML) and drop + from state (``.pps``) as well as return in a ``dict`` all + ``Position``s which have recently closed. + + ''' + # ONLY dict-serialize all active positions; those that are closed + # we don't store in the ``pps.toml``. + # NOTE: newly closed position are also important to report/return + # since a consumer, like an order mode UI ;), might want to react + # based on the closure. + pp_entries = {} + closed_pp_objs: dict[str, Position] = {} + + pp_objs = self.pps + for bsuid in list(pp_objs): + pp = pp_objs[bsuid] + + # XXX: debug hook for size mismatches + # qqqbsuid = 320227571 + # if bsuid == qqqbsuid: + # breakpoint() + + pp.minimize_clears() + + if ( + # "net-zero" is a "closed" position + pp.size == 0 + + # time-expired pps (normally derivatives) are "closed" + or (pp.expiry and pp.expiry < now()) + ): + # for expired cases + pp.size = 0 + + # NOTE: we DO NOT pop the pp here since it can still be + # used to check for duplicate clears that may come in as + # new transaction from some backend API and need to be + # ignored; the closed positions won't be written to the + # ``pps.toml`` since ``pp_entries`` above is what's + # written. + # closed_pp = pp_objs.pop(bsuid, None) + closed_pp = pp_objs.get(bsuid) + if closed_pp: + closed_pp_objs[bsuid] = closed_pp + + else: + # serialize to pre-toml form + asdict = pp.to_pretoml() + + if pp.expiry is None: + asdict.pop('expiry', None) + + # TODO: we need to figure out how to have one top level + # listing venue here even when the backend isn't providing + # it via the trades ledger.. + # drop symbol obj in serialized form + s = asdict.pop('symbol') + fqsn = s.front_fqsn() + log.info(f'Updating active pp: {fqsn}') + + # XXX: ugh, it's cuz we push the section under + # the broker name.. maybe we need to rethink this? + brokerless_key = fqsn.removeprefix(f'{brokername}.') + + pp_entries[brokerless_key] = asdict + + return pp_entries, closed_pp_objs + + def update_pps( records: dict[str, Transaction], pps: Optional[dict[str, Position]] = None @@ -312,55 +452,12 @@ def update_pps( ''' pps: dict[str, Position] = pps or {} - - # lifo update all pps from records - for r in records: - - pp = pps.setdefault( - r.bsuid, - - # if no existing pp, allocate fresh one. - Position( - Symbol.from_fqsn( - r.fqsn, - info={}, - ), - size=0.0, - be_price=0.0, - bsuid=r.bsuid, - expiry=r.expiry, - ) - ) - - # don't do updates for ledger records we already have - # included in the current pps state. - if r.tid in pp.clears: - # NOTE: likely you'll see repeats of the same - # ``Transaction`` passed in here if/when you are restarting - # a ``brokerd.ib`` where the API will re-report trades from - # the current session, so we need to make sure we don't - # "double count" these in pp calculations. - continue - - # lifo style "breakeven" price calc - pp.lifo_update( - r.size, - r.price, - - # include transaction cost in breakeven price - # and presume the worst case of the same cost - # to exit this transaction (even though in reality - # it will be dynamic based on exit stratetgy). - cost=2*r.cost, - ) - - # track clearing data - pp.update(r) - - return pps + table = PpTable(pps) + table.update_from_trans(records) + return table.pps -def load_pps_from_ledger( +def load_trans_from_ledger( brokername: str, acctname: str, @@ -385,55 +482,18 @@ def load_pps_from_ledger( return {} brokermod = get_brokermod(brokername) - src_records = brokermod.norm_trade_records(ledger) + src_records: dict[str, Transaction] = brokermod.norm_trade_records(ledger) if filter_by: + records = {} bsuids = set(filter_by) - records = list(filter(lambda r: r.bsuid in bsuids, src_records)) + for tid, r in src_records.items(): + if r.bsuid in bsuids: + records[tid] = r else: records = src_records - return update_pps(records) - - -def get_pps( - brokername: str, - acctids: Optional[set[str]] = set(), - -) -> dict[str, dict[str, Position]]: - ''' - Read out broker-specific position entries from - incremental update file: ``pps.toml``. - - ''' - conf, path = config.load( - 'pps', - # load dicts as inlines to preserve compactness - # _dict=toml.decoder.InlineTableDict, - ) - - all_active = {} - all_closed = {} - - # try to load any ledgers if no section found - bconf, path = config.load('brokers') - accounts = bconf[brokername]['accounts'] - for account in accounts: - - # TODO: instead of this filter we could - # always send all known pps but just not audit - # them since an active client might not be up? - if ( - acctids and - f'{brokername}.{account}' not in acctids - ): - continue - - active, closed = update_pps_conf(brokername, account) - all_active.setdefault(account, {}).update(active) - all_closed.setdefault(account, {}).update(closed) - - return all_active, all_closed + return records # TODO: instead see if we can hack tomli and tomli-w to do the same: @@ -578,47 +638,77 @@ def load_pps_from_toml( # caller to pass in a symbol set they'd like to reload from the # underlying ledger to be reprocessed in computing pps state. reload_records: Optional[dict[str, str]] = None, + + # XXX: this is "global" update from ledger flag which + # does a full refresh of pps from the available ledger. update_from_ledger: bool = False, -) -> tuple[dict, dict[str, Position]]: +) -> tuple[PpTable, dict[str, str]]: ''' Load and marshal to objects all pps from either an existing ``pps.toml`` config, or from scratch from a ledger file when none yet exists. + ''' + with open_pps( + brokername, + acctid, + write_on_exit=False, + ) as table: + pp_objs = table.pps + + # no pps entry yet for this broker/account so parse any available + # ledgers to build a brand new pps state. + if not pp_objs or update_from_ledger: + trans = load_trans_from_ledger( + brokername, + acctid, + ) + table.update_from_trans(trans) + + # Reload symbol specific ledger entries if requested by the + # caller **AND** none exist in the current pps state table. + elif ( + pp_objs and reload_records + ): + # no pps entry yet for this broker/account so parse + # any available ledgers to build a pps state. + trans = load_trans_from_ledger( + brokername, + acctid, + filter_by=reload_records, + ) + table.update_from_trans(trans) + + if not table.pps: + log.warning( + f'No `pps.toml` values could be loaded {brokername}:{acctid}' + ) + + return table, table.conf + + +@cm +def open_pps( + brokername: str, + acctid: str, + write_on_exit: bool = True, + +) -> PpTable: + ''' + Read out broker-specific position entries from + incremental update file: ``pps.toml``. + ''' conf, path = config.load('pps') brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) + pp_objs = {} + table = PpTable(pp_objs, conf=conf) - # no pps entry yet for this broker/account so parse any available - # ledgers to build a brand new pps state. - if not pps or update_from_ledger: - pp_objs = load_pps_from_ledger( - brokername, - acctid, - ) - - # Reload symbol specific ledger entries if requested by the - # caller **AND** none exist in the current pps state table. - elif ( - pps and reload_records - ): - # no pps entry yet for this broker/account so parse - # any available ledgers to build a pps state. - pp_objs = load_pps_from_ledger( - brokername, - acctid, - filter_by=reload_records, - ) - - if not pps: - log.warning( - f'No `pps.toml` positions could be loaded {brokername}:{acctid}' - ) - - # unmarshal/load ``pps.toml`` config entries into object form. + # unmarshal/load ``pps.toml`` config entries into object form + # and update `PpTable` obj entries. for fqsn, entry in pps.items(): bsuid = entry['bsuid'] @@ -674,29 +764,62 @@ def load_pps_from_toml( clears=clears, ) - return conf, pp_objs + yield table + + if not write_on_exit: + return + + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ``pps.toml`` for {path}:\n') + + pp_entries, closed_pp_objs = table.dump_active(brokername) + conf[brokername][acctid] = pp_entries + + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table + + config.write( + conf, + 'pps', + encoder=enc, + ) def update_pps_conf( brokername: str, acctid: str, - trade_records: Optional[list[Transaction]] = None, + trade_records: Optional[dict[str, Transaction]] = None, ledger_reload: Optional[dict[str, str]] = None, ) -> tuple[ dict[str, Position], dict[str, Position], ]: - - # this maps `.bsuid` values to positions - pp_objs: dict[Union[str, int], Position] + # TODO: ideally we can pass in an existing + # pps state to this right? such that we + # don't have to do a ledger reload all the + # time.. a couple ideas I can think of, + # - load pps once after backend ledger state + # is loaded and keep maintainend in memory + # inside a with block, + # - mirror this in some client side actor which + # does the actual ledger updates (say the paper + # engine proc if we decide to always spawn it?), + # - do diffs against updates from the ledger writer + # actor and the in-mem state here? if trade_records and ledger_reload: - for r in trade_records: + for tid, r in trade_records.items(): ledger_reload[r.bsuid] = r.fqsn - conf, pp_objs = load_pps_from_toml( + table, conf = load_pps_from_toml( brokername, acctid, reload_records=ledger_reload, @@ -705,60 +828,11 @@ def update_pps_conf( # update all pp objects from any (new) trade records which # were passed in (aka incremental update case). if trade_records: - pp_objs = update_pps( - trade_records, - pps=pp_objs, - ) + table.update_from_trans(trade_records) - pp_entries = {} # dict-serialize all active pps - # NOTE: newly closed position are also important to report/return - # since a consumer, like an order mode UI ;), might want to react - # based on the closure. - closed_pp_objs: dict[str, Position] = {} - - for bsuid in list(pp_objs): - pp = pp_objs[bsuid] - - # XXX: debug hook for size mismatches - # if bsuid == 447767096: - # breakpoint() - - pp.minimize_clears() - - if ( - pp.size == 0 - - # drop time-expired positions (normally derivatives) - or (pp.expiry and pp.expiry < now()) - ): - # if expired the position is closed - pp.size = 0 - - # position is already closed aka "net zero" - closed_pp = pp_objs.pop(bsuid, None) - if closed_pp: - closed_pp_objs[bsuid] = closed_pp - - else: - # serialize to pre-toml form - asdict = pp.to_pretoml() - - if pp.expiry is None: - asdict.pop('expiry', None) - - # TODO: we need to figure out how to have one top level - # listing venue here even when the backend isn't providing - # it via the trades ledger.. - # drop symbol obj in serialized form - s = asdict.pop('symbol') - fqsn = s.front_fqsn() - log.info(f'Updating active pp: {fqsn}') - - # XXX: ugh, it's cuz we push the section under - # the broker name.. maybe we need to rethink this? - brokerless_key = fqsn.removeprefix(f'{brokername}.') - - pp_entries[brokerless_key] = asdict + # this maps `.bsuid` values to positions + pp_entries, closed_pp_objs = table.dump_active(brokername) + pp_objs: dict[Union[str, int], Position] = table.pps conf[brokername][acctid] = pp_entries