diff --git a/piker/pp.py b/piker/pp.py index 8180d515..35441fb9 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -20,9 +20,8 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. (looking at you `ib` and dirt-bird friends) ''' -from collections import deque from contextlib import contextmanager as cm -# from pprint import pformat +from pprint import pformat import os from os import path from math import copysign @@ -149,7 +148,7 @@ class Position(Struct): for f in self.__struct_fields__ } - def to_pretoml(self) -> dict: + def to_pretoml(self) -> tuple[str, dict]: ''' Prep this position's data contents for export to toml including re-structuring of the ``.clears`` table to an array of @@ -160,6 +159,13 @@ class Position(Struct): clears = d.pop('clears') expiry = d.pop('expiry') + # TODO: we need to figure out how to have one top level + # listing venue here even when the backend isn't providing + # it via the trades ledger.. + # drop symbol obj in serialized form + s = d.pop('symbol') + fqsn = s.front_fqsn() + size = d.pop('size') be_price = d.pop('be_price') d['size'], d['be_price'] = self.audit_sizing(size, be_price) @@ -181,7 +187,7 @@ class Position(Struct): d['clears'] = toml_clears_list - return d + return fqsn, d def audit_sizing( self, @@ -199,7 +205,7 @@ class Position(Struct): size = size or self.size be_price = be_price or self.be_price csize = self.calc_size() - cbe_price = self.calc_be_price() + cbe_price = self.calc_ppu() if size != csize: log.warning(f'size != calculated size: {size} != {csize}') @@ -246,18 +252,6 @@ class Position(Struct): ''' return self.be_price * self.size - def add_clear( - self, - t: Transaction, - - ) -> None: - self.clears[t.tid] = { - 'cost': t.cost, - 'price': t.price, - 'size': t.size, - 'dt': str(t.dt), - } - # TODO: idea: "real LIFO" dynamic positioning. # - when a trade takes place where the pnl for # the (set of) trade(s) is below the breakeven price @@ -269,7 +263,7 @@ class Position(Struct): # def lifo_price() -> float: # ... - def calc_be_price( + def calc_ppu( self, # include transaction cost in breakeven price # and presume the worst case of the same cost @@ -278,60 +272,105 @@ class Position(Struct): cost_scalar: float = 2, ) -> float: + ''' + Compute the "price-per-unit" price for the given non-zero sized + rolling position. - size: float = 0 - cb_tot_size: float = 0 - cost_basis: float = 0 - be_price: float = 0 + The recurrence relation which computes this (exponential) mean + per new clear which **increases** the accumulative postiion size + is: + + ppu[-1] = ( + ppu[-2] * accum_size[-2] + + + ppu[-1] * size + ) / accum_size[-1] + + where `cost_basis` for the current step is simply the price + * size of the most recent clearing transaction. + + ''' + asize_h: list[float] = [] # historical accumulative size + ppu_h: list[float] = [] # historical price-per-unit + + clears = list(self.clears.items()) + + for i, (tid, entry) in enumerate(clears): - for tid, entry in self.clears.items(): clear_size = entry['size'] clear_price = entry['price'] - new_size = size + clear_size - # old size minus the new size gives us size diff with - # +ve -> increase in pp size - # -ve -> decrease in pp size - size_diff = abs(new_size) - abs(size) + last_accum_size = asize_h[-1] if asize_h else 0 + accum_size = last_accum_size + clear_size + accum_sign = copysign(1, accum_size) - if new_size == 0: - cost_basis = 0 - cb_tot_size = 0 - be_price = 0 + sign_change: bool = False + + if accum_size == 0: + ppu_h.append(0) + asize_h.append(0) + continue + + # test if the pp somehow went "passed" a net zero size state + # resulting in a change of the "sign" of the size (+ve for + # long, -ve for short). + sign_change = ( + copysign(1, last_accum_size) + accum_sign == 0 + and last_accum_size != 0 + ) + + # since we passed the net-zero-size state the new size + # after sum should be the remaining size the new + # "direction" (aka, long vs. short) for this clear. + if sign_change: + clear_size = accum_size + abs_diff = abs(accum_size) + asize_h.append(0) + ppu_h.append(0) + + else: + # old size minus the new size gives us size diff with + # +ve -> increase in pp size + # -ve -> decrease in pp size + abs_diff = abs(accum_size) - abs(last_accum_size) # XXX: LIFO breakeven price update. only an increaze in size # of the position contributes the breakeven price, # a decrease does not (i.e. the position is being made # smaller). - elif size_diff > 0: + # abs_clear_size = abs(clear_size) + abs_new_size = abs(accum_size) - cost_basis += ( - # weighted price per unit of + if abs_diff > 0: + + cost_basis = ( + # cost basis for this clear clear_price * abs(clear_size) + # transaction cost - (copysign(1, new_size) - * - cost_scalar - * - entry['cost']) + accum_sign * cost_scalar * entry['cost'] ) - cb_tot_size += abs(clear_size) - be_price = cost_basis / cb_tot_size - size = new_size + if asize_h: + size_last = abs(asize_h[-1]) + cb_last = ppu_h[-1] * size_last + ppu = (cost_basis + cb_last) / abs_new_size - # print( - # f'cb: {cost_basis}\n' - # f'size: {size}\n' - # f'clear_size: {clear_size}\n' - # f'clear_price: {clear_price}\n\n' + else: + ppu = cost_basis / abs_new_size - # f'cb_tot_size: {cb_tot_size}\n' - # f'be_price: {be_price}\n\n' - # ) + ppu_h.append(ppu) + asize_h.append(accum_size) - return be_price + else: + # on "exit" clears from a given direction, + # only the size changes not the price-per-unit + # need to be updated since the ppu remains constant + # and gets weighted by the new size. + asize_h.append(accum_size) + ppu_h.append(ppu_h[-1]) + + return ppu_h[-1] if ppu_h else 0 def calc_size(self) -> float: size: float = 0 @@ -349,17 +388,21 @@ class Position(Struct): unecessary history irrelevant to the current pp state. ''' - size: float = self.size - clears_since_zero: deque[tuple(str, dict)] = deque() + size: float = 0 + clears_since_zero: list[tuple(str, dict)] = [] - # scan for the last "net zero" position by - # iterating clears in reverse. - for tid, clear in reversed(self.clears.items()): - size -= clear['size'] - clears_since_zero.appendleft((tid, clear)) + # TODO: we might just want to always do this when iterating + # a ledger? keep a state of the last net-zero and only do the + # full iterate when no state was stashed? + + # scan for the last "net zero" position by iterating + # transactions until the next net-zero size, rinse, repeat. + for tid, clear in self.clears.items(): + size += clear['size'] + clears_since_zero.append((tid, clear)) if size == 0: - break + clears_since_zero.clear() self.clears = dict(clears_since_zero) return self.clears @@ -367,6 +410,7 @@ class Position(Struct): class PpTable(Struct): + brokername: str pps: dict[str, Position] conf: Optional[dict] = {} @@ -378,31 +422,30 @@ class PpTable(Struct): ) -> dict[str, Position]: pps = self.pps - updated: dict[str, Position] = {} # lifo update all pps from records - for tid, r in trans.items(): + for tid, t in trans.items(): pp = pps.setdefault( - r.bsuid, + t.bsuid, # if no existing pp, allocate fresh one. Position( Symbol.from_fqsn( - r.fqsn, + t.fqsn, info={}, ), size=0.0, be_price=0.0, - bsuid=r.bsuid, - expiry=r.expiry, + bsuid=t.bsuid, + expiry=t.expiry, ) ) # don't do updates for ledger records we already have # included in the current pps state. - if r.tid in pp.clears: + if t.tid in pp.clears: # NOTE: likely you'll see repeats of the same # ``Transaction`` passed in here if/when you are restarting # a ``brokerd.ib`` where the API will re-report trades from @@ -410,22 +453,35 @@ class PpTable(Struct): # "double count" these in pp calculations. continue - # track clearing data - pp.add_clear(r) - updated[r.bsuid] = pp + # update clearing table and populate rolling + # ppu and accumulative size. + clear = pp.clears[t.tid] = { + 'cost': t.cost, + 'price': t.price, + 'size': t.size, + 'dt': str(t.dt), + } + + # TODO: compute these incrementally instead + # of re-looping through each time resulting in O(n**2) + # behaviour.. + # compute these **after** adding the entry + # in order to make the recurrence relation math work + # inside ``.calc_size()``. + clear['accum_size'] = pp.calc_size() + clear['ppu'] = pp.calc_ppu() + updated[t.bsuid] = pp # minimize clears tables and update sizing. for bsuid, pp in updated.items(): - pp.minimize_clears() pp.size, pp.be_price = pp.audit_sizing() return updated def dump_active( self, - brokername: str, ) -> tuple[ - dict[str, Any], + dict[str, Position], dict[str, Position] ]: ''' @@ -435,13 +491,12 @@ class PpTable(Struct): ``Position``s which have recently closed. ''' - # ONLY dict-serialize all active positions; those that are closed - # we don't store in the ``pps.toml``. # NOTE: newly closed position are also important to report/return # since a consumer, like an order mode UI ;), might want to react - # based on the closure. - pp_active_entries = {} + # based on the closure (for example removing the breakeven line + # and clearing the entry from any lists/monitors). closed_pp_objs: dict[str, Position] = {} + open_pp_objs: dict[str, Position] = {} pp_objs = self.pps for bsuid in list(pp_objs): @@ -452,7 +507,6 @@ class PpTable(Struct): # if bsuid == qqqbsuid: # breakpoint() - pp.minimize_clears() size, be_price = pp.audit_sizing() if ( @@ -471,48 +525,42 @@ class PpTable(Struct): # ignored; the closed positions won't be written to the # ``pps.toml`` since ``pp_active_entries`` above is what's # written. - # closed_pp = pp_objs.pop(bsuid, None) - closed_pp = pp_objs.get(bsuid) - if closed_pp: - closed_pp_objs[bsuid] = closed_pp + closed_pp_objs[bsuid] = pp else: - # serialize to pre-toml form - asdict = pp.to_pretoml() + open_pp_objs[bsuid] = pp - # TODO: we need to figure out how to have one top level - # listing venue here even when the backend isn't providing - # it via the trades ledger.. - # drop symbol obj in serialized form - s = asdict.pop('symbol') - fqsn = s.front_fqsn() - log.info(f'Updating active pp: {fqsn}') + return open_pp_objs, closed_pp_objs - # XXX: ugh, it's cuz we push the section under - # the broker name.. maybe we need to rethink this? - brokerless_key = fqsn.removeprefix(f'{brokername}.') + def to_toml( + self, + ) -> dict[str, Any]: - pp_active_entries[brokerless_key] = asdict + active, closed = self.dump_active() - return pp_active_entries, closed_pp_objs + # ONLY dict-serialize all active positions; those that are closed + # we don't store in the ``pps.toml``. + to_toml_dict = {} + + for bsuid, pos in active.items(): + + # keep the minimal amount of clears that make up this + # position since the last net-zero state. + pos.minimize_clears() + + # serialize to pre-toml form + fqsn, asdict = pos.to_pretoml() + log.info(f'Updating active pp: {fqsn}') + + # XXX: ugh, it's cuz we push the section under + # the broker name.. maybe we need to rethink this? + brokerless_key = fqsn.removeprefix(f'{self.brokername}.') + to_toml_dict[brokerless_key] = asdict + + return to_toml_dict -def update_pps( - records: dict[str, Transaction], - pps: Optional[dict[str, Position]] = None - -) -> dict[str, Position]: - ''' - Compile a set of positions from a trades ledger. - - ''' - pps: dict[str, Position] = pps or {} - table = PpTable(pps) - table.update_from_trans(records) - return table.pps - - -def load_trans_from_ledger( +def load_pps_from_ledger( brokername: str, acctname: str, @@ -520,35 +568,40 @@ def load_trans_from_ledger( # post normalization filter on ledger entries to be processed filter_by: Optional[list[dict]] = None, -) -> dict[str, Position]: +) -> tuple[ + dict[str, Transaction], + dict[str, Position], +]: ''' Open a ledger file by broker name and account and read in and - process any trade records into our normalized ``Transaction`` - form and then pass these into the position processing routine - and deliver the two dict-sets of the active and closed pps. + process any trade records into our normalized ``Transaction`` form + and then update the equivalent ``Pptable`` and deliver the two + bsuid-mapped dict-sets of the transactions and pps. ''' - with open_trade_ledger( - brokername, - acctname, - ) as ledger: + with ( + open_trade_ledger(brokername, acctname) as ledger, + open_pps(brokername, acctname) as table, + ): if not ledger: # null case, no ledger file with content return {} - brokermod = get_brokermod(brokername) - src_records: dict[str, Transaction] = brokermod.norm_trade_records(ledger) + mod = get_brokermod(brokername) + src_records: dict[str, Transaction] = mod.norm_trade_records(ledger) - if filter_by: - records = {} - bsuids = set(filter_by) - for tid, r in src_records.items(): - if r.bsuid in bsuids: - records[tid] = r - else: - records = src_records + if filter_by: + records = {} + bsuids = set(filter_by) + for tid, r in src_records.items(): + if r.bsuid in bsuids: + records[tid] = r + else: + records = src_records - return records + updated = table.update_from_trans(records) + + return records, updated # TODO: instead see if we can hack tomli and tomli-w to do the same: @@ -682,67 +735,6 @@ class PpsEncoder(toml.TomlEncoder): return (retstr, retdict) -def load_pps_from_toml( - brokername: str, - acctid: str, - - # XXX: there is an edge case here where we may want to either audit - # the retrieved ``pps.toml`` output or reprocess it since there was - # an error on write on the last attempt to update the state file - # even though the ledger *was* updated. For this cases we allow the - # caller to pass in a symbol set they'd like to reload from the - # underlying ledger to be reprocessed in computing pps state. - reload_records: Optional[dict[str, str]] = None, - - # XXX: this is "global" update from ledger flag which - # does a full refresh of pps from the available ledger. - update_from_ledger: bool = False, - -) -> tuple[PpTable, dict[str, str]]: - ''' - Load and marshal to objects all pps from either an existing - ``pps.toml`` config, or from scratch from a ledger file when - none yet exists. - - ''' - with open_pps( - brokername, - acctid, - write_on_exit=False, - ) as table: - pp_objs = table.pps - - # no pps entry yet for this broker/account so parse any available - # ledgers to build a brand new pps state. - if not pp_objs or update_from_ledger: - trans = load_trans_from_ledger( - brokername, - acctid, - ) - table.update_from_trans(trans) - - # Reload symbol specific ledger entries if requested by the - # caller **AND** none exist in the current pps state table. - elif ( - pp_objs and reload_records - ): - # no pps entry yet for this broker/account so parse - # any available ledgers to build a pps state. - trans = load_trans_from_ledger( - brokername, - acctid, - filter_by=reload_records, - ) - table.update_from_trans(trans) - - if not table.pps: - log.warning( - f'No `pps.toml` values could be loaded {brokername}:{acctid}' - ) - - return table, table.conf - - @cm def open_pps( brokername: str, @@ -759,8 +751,22 @@ def open_pps( brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) + # TODO: ideally we can pass in an existing + # pps state to this right? such that we + # don't have to do a ledger reload all the + # time.. a couple ideas I can think of, + # - mirror this in some client side actor which + # does the actual ledger updates (say the paper + # engine proc if we decide to always spawn it?), + # - do diffs against updates from the ledger writer + # actor and the in-mem state here? + pp_objs = {} - table = PpTable(pp_objs, conf=conf) + table = PpTable( + brokername, + pp_objs, + conf=conf, + ) # unmarshal/load ``pps.toml`` config entries into object form # and update `PpTable` obj entries. @@ -817,7 +823,8 @@ def open_pps( # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries print(f'Updating ``pps.toml`` for {path}:\n') - pp_entries, closed_pp_objs = table.dump_active(brokername) + # active, closed_pp_objs = table.dump_active() + pp_entries = table.to_toml() conf[brokername][acctid] = pp_entries # TODO: why tf haven't they already done this for inline @@ -835,66 +842,6 @@ def open_pps( ) -def update_pps_conf( - brokername: str, - acctid: str, - - trade_records: Optional[dict[str, Transaction]] = None, - ledger_reload: Optional[dict[str, str]] = None, - -) -> tuple[ - dict[str, Position], - dict[str, Position], -]: - # TODO: ideally we can pass in an existing - # pps state to this right? such that we - # don't have to do a ledger reload all the - # time.. a couple ideas I can think of, - # - load pps once after backend ledger state - # is loaded and keep maintainend in memory - # inside a with block, - # - mirror this in some client side actor which - # does the actual ledger updates (say the paper - # engine proc if we decide to always spawn it?), - # - do diffs against updates from the ledger writer - # actor and the in-mem state here? - - if trade_records and ledger_reload: - for tid, r in trade_records.items(): - ledger_reload[r.bsuid] = r.fqsn - - table, conf = load_pps_from_toml( - brokername, - acctid, - reload_records=ledger_reload, - ) - - # update all pp objects from any (new) trade records which - # were passed in (aka incremental update case). - if trade_records: - table.update_from_trans(trade_records) - - # this maps `.bsuid` values to positions - pp_entries, closed_pp_objs = table.dump_active(brokername) - pp_objs: dict[Union[str, int], Position] = table.pps - - conf[brokername][acctid] = pp_entries - - # TODO: why tf haven't they already done this for inline tables smh.. - enc = PpsEncoder(preserve=True) - # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) - enc.dump_funcs[toml.decoder.InlineTableDict] = enc.dump_inline_table - - config.write( - conf, - 'pps', - encoder=enc, - ) - - # deliver object form of all pps in table to caller - return pp_objs, closed_pp_objs - - if __name__ == '__main__': import sys @@ -903,4 +850,9 @@ if __name__ == '__main__': args = args[1:] for acctid in args: broker, name = acctid.split('.') - update_pps_conf(broker, name) + trans, updated_pps = load_pps_from_ledger(broker, name) + print( + f'Processing transactions into pps for {broker}:{acctid}\n' + f'{pformat(trans)}\n\n' + f'{pformat(updated_pps)}' + )