diff --git a/piker/pp.py b/piker/pp.py index 7b775c77..7f363915 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -15,9 +15,9 @@ # along with this program. If not, see . ''' -Personal/Private position parsing, calculmating, summarizing in a way +Personal/Private position parsing, calculating, summarizing in a way that doesn't try to cuk most humans who prefer to not lose their moneys.. -(looking at you `ib` and shitzy friends) +(looking at you `ib` and dirt-bird friends) ''' from contextlib import contextmanager as cm @@ -86,9 +86,9 @@ def open_trade_ledger( return toml.dump(ledger, cf) -class TradeRecord(Struct): +class Transaction(Struct): fqsn: str # normally fqsn - tid: Union[str, int] + tid: Union[str, int] # unique transaction id size: float price: float cost: float # commisions or other additional costs @@ -98,7 +98,7 @@ class TradeRecord(Struct): # optional key normally derived from the broker # backend which ensures the instrument-symbol this record # is for is truly unique. - symkey: Optional[Union[str, int]] = None + bsuid: Optional[Union[str, int]] = None class Position(Struct): @@ -113,9 +113,14 @@ class Position(Struct): # last size and avg entry price size: float avg_price: float # TODO: contextual pricing + bsuid: str # ordered record of known constituent trade messages - fills: list[Union[str, int, Status]] = [] + fills: dict[ + Union[str, int, Status], # trade id + float, # cost + ] = {} + def to_dict(self): return { @@ -160,6 +165,7 @@ class Position(Struct): self, size: float, price: float, + cost: float = 0, # TODO: idea: "real LIFO" dynamic positioning. # - when a trade takes place where the pnl for @@ -198,6 +204,8 @@ class Position(Struct): self.avg_price = ( abs(size) * price # weight of current exec + + cost # transaction cost + + self.avg_price * abs(self.size) # weight of previous pp ) / abs(new_size) @@ -207,9 +215,7 @@ class Position(Struct): def update_pps( - brokername: str, - records: dict[str, TradeRecord], - + records: dict[str, Transaction], pps: Optional[dict[str, Position]] = None ) -> dict[str, Position]: @@ -223,7 +229,7 @@ def update_pps( for r in records: pp = pps.setdefault( - r.fqsn or r.symkey, + r.fqsn or r.bsuid, # if no existing pp, allocate fresh one. Position( @@ -233,27 +239,39 @@ def update_pps( ), size=0.0, avg_price=0.0, + bsuid=r.bsuid, ) ) # don't do updates for ledger records we already have # included in the current pps state. if r.tid in pp.fills: # NOTE: likely you'll see repeats of the same - # ``TradeRecord`` passed in here if/when you are restarting + # ``Transaction`` passed in here if/when you are restarting # a ``brokerd.ib`` where the API will re-report trades from # the current session, so we need to make sure we don't # "double count" these in pp calculations. continue - # lifo style average price calc - pp.lifo_update(r.size, r.price) - pp.fills.append(r.tid) + # lifo style "breakeven" price calc + pp.lifo_update( + r.size, + r.price, + + # include transaction cost in breakeven price + # and presume the worst case of the same cost + # to exit this transaction (even though in reality + # it will be dynamic based on exit stratetgy). + cost=2*r.cost, + ) + + # track clearing costs + pp.fills[r.tid] = r.cost assert len(set(pp.fills)) == len(pp.fills) return pps -def _split_active( +def dump_active( pps: dict[str, Position], ) -> tuple[ @@ -277,9 +295,9 @@ def _split_active( fqsn = pp.symbol.front_fqsn() asdict = pp.to_dict() if pp.size == 0: - closed[fqsn] = asdict + closed[k] = asdict else: - active[fqsn] = asdict + active[k] = asdict return active, closed @@ -292,7 +310,7 @@ def load_pps_from_ledger( ) -> tuple[dict, dict]: ''' Open a ledger file by broker name and account and read in and - process any trade records into our normalized ``TradeRecord`` + process any trade records into our normalized ``Transaction`` form and then pass these into the position processing routine and deliver the two dict-sets of the active and closed pps. @@ -305,9 +323,8 @@ def load_pps_from_ledger( brokermod = get_brokermod(brokername) records = brokermod.norm_trade_records(ledger) - pps = update_pps(brokername, records) - - return _split_active(pps) + pps = update_pps(records) + return dump_active(pps) def get_pps( @@ -326,57 +343,55 @@ def get_pps( def update_pps_conf( brokername: str, acctid: str, - trade_records: Optional[list[TradeRecord]] = None, + trade_records: Optional[list[Transaction]] = None, ) -> dict[str, Position]: conf, path = config.load('pps') brokersection = conf.setdefault(brokername, {}) - entries = brokersection.setdefault(acctid, {}) - - if not entries: + accountsection = pps = brokersection.setdefault(acctid, {}) + if not pps: # no pps entry yet for this broker/account so parse # any available ledgers to build a pps state. - active, closed = load_pps_from_ledger( + pps, closed = load_pps_from_ledger( brokername, acctid, ) - - elif trade_records: - - # table for map-back to object form - pps = {} - - # load ``pps.toml`` config entries back into object form. - for fqsn, entry in entries.items(): - pps[f'{fqsn}.{brokername}'] = Position( - Symbol.from_fqsn(fqsn, info={}), - size=entry['size'], - avg_price=entry['avg_price'], - - # XXX: super critical, we need to be sure to include - # all pps.toml fills to avoid reusing fills that were - # already included in the current incremental update - # state, since today's records may have already been - # processed! - fills=entry['fills'], + if not pps: + log.warning( + f'No trade history could be loaded for {brokername}:{acctid}' ) - pps = update_pps( - brokername, - trade_records, - pps=pps, + # unmarshal/load ``pps.toml`` config entries into object form. + pp_objs = {} + for fqsn, entry in pps.items(): + pp_objs[fqsn] = Position( + Symbol.from_fqsn(fqsn, info={}), + size=entry['size'], + avg_price=entry['avg_price'], + bsuid=entry['bsuid'], + + # XXX: super critical, we need to be sure to include + # all pps.toml fills to avoid reusing fills that were + # already included in the current incremental update + # state, since today's records may have already been + # processed! + fills=entry['fills'], ) - active, closed = _split_active(pps) - else: - raise ValueError('wut wut') + # update all pp objects from any (new) trade records which + # were passed in (aka incremental update case). + if trade_records: + pp_objs = update_pps( + trade_records, + pps=pp_objs, + ) - for fqsn in closed: - print(f'removing closed pp: {fqsn}') - entries.pop(fqsn, None) + active, closed = dump_active(pp_objs) + # dict-serialize all active pps + pp_entries = {} for fqsn, pp_dict in active.items(): print(f'Updating active pp: {fqsn}') @@ -386,8 +401,9 @@ def update_pps_conf( # XXX: ugh, it's cuz we push the section under # the broker name.. maybe we need to rethink this? brokerless_key = fqsn.rstrip(f'.{brokername}') - entries[brokerless_key] = pp_dict + pp_entries[brokerless_key] = pp_dict + conf[brokername][acctid] = pp_entries config.write( conf, 'pps',