diff --git a/piker/pp.py b/piker/pp.py index a9950ef2..6030249e 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -31,6 +31,8 @@ from typing import ( ) from msgspec import Struct +import pendulum +from pendulum import datetime, now import toml from . import config @@ -93,8 +95,8 @@ class Transaction(Struct): size: float price: float cost: float # commisions or other additional costs - - # dt: datetime + dt: datetime + expiry: Optional[datetime] = None # optional key normally derived from the broker # backend which ensures the instrument-symbol this record @@ -110,9 +112,14 @@ class Position(Struct): ''' symbol: Symbol - # last size and avg entry price + # can be +ve or -ve for long/short size: float - avg_price: float # TODO: contextual pricing + + # "breakeven price" above or below which pnl moves above and below + # zero for the entirety of the current "trade state". + be_price: float + + # unique backend symbol id bsuid: str # ordered record of known constituent trade messages @@ -121,6 +128,8 @@ class Position(Struct): float, # cost ] = {} + expiry: Optional[datetime] = None + def to_dict(self) -> dict: return { f: getattr(self, f) @@ -130,12 +139,16 @@ class Position(Struct): def to_pretoml(self) -> dict: d = self.to_dict() clears = d.pop('clears') - + expiry = d.pop('expiry') + # if not expiry is None: + # breakpoint() + if expiry: + d['expiry'] = str(expiry) # clears_list = [] inline_table = toml.TomlDecoder().get_empty_inline_table() for tid, data in clears.items(): - inline_table[tid] = data + inline_table[f'{tid}'] = data # clears_list.append(inline_table) @@ -153,7 +166,7 @@ class Position(Struct): symbol = self.symbol lot_size_digits = symbol.lot_size_digits - avg_price, size = ( + be_price, size = ( round( msg['avg_price'], ndigits=symbol.tick_size_digits @@ -164,7 +177,7 @@ class Position(Struct): ), ) - self.avg_price = avg_price + self.be_price = be_price self.size = size @property @@ -174,7 +187,7 @@ class Position(Struct): terms. ''' - return self.avg_price * self.size + return self.be_price * self.size def lifo_update( self, @@ -209,24 +222,24 @@ class Position(Struct): size_diff = abs(new_size) - abs(self.size) if new_size == 0: - self.avg_price = 0 + self.be_price = 0 elif size_diff > 0: # XXX: LOFI incremental update: # only update the "average price" when # the size increases not when it decreases (i.e. the # position is being made smaller) - self.avg_price = ( + self.be_price = ( abs(size) * price # weight of current exec + cost # transaction cost + - self.avg_price * abs(self.size) # weight of previous pp + self.be_price * abs(self.size) # weight of previous pp ) / abs(new_size) self.size = new_size - return new_size, self.avg_price + return new_size, self.be_price def update_pps( @@ -253,10 +266,12 @@ def update_pps( info={}, ), size=0.0, - avg_price=0.0, + be_price=0.0, bsuid=r.bsuid, + expiry=r.expiry, ) ) + # don't do updates for ledger records we already have # included in the current pps state. if r.tid in pp.clears: @@ -307,8 +322,18 @@ def dump_active( closed = {} for k, pp in pps.items(): + asdict = pp.to_pretoml() - if pp.size == 0: + + if pp.expiry is None: + asdict.pop('expiry', None) + + if ( + pp.size == 0 + + # drop time-expired positions (normally derivatives) + or (pp.expiry and pp.expiry < now()) + ): closed[k] = asdict else: active[k] = asdict @@ -321,7 +346,7 @@ def load_pps_from_ledger( brokername: str, acctname: str, -) -> tuple[dict, dict]: +) -> dict[str, Position]: ''' Open a ledger file by broker name and account and read in and process any trade records into our normalized ``Transaction`` @@ -341,8 +366,7 @@ def load_pps_from_ledger( brokermod = get_brokermod(brokername) records = brokermod.norm_trade_records(ledger) - pps = update_pps(records) - return dump_active(pps) + return update_pps(records) def get_pps( @@ -509,7 +533,7 @@ def update_pps_conf( if not pps: # no pps entry yet for this broker/account so parse # any available ledgers to build a pps state. - pps, closed = load_pps_from_ledger( + pp_objs = load_pps_from_ledger( brokername, acctid, ) @@ -518,30 +542,37 @@ def update_pps_conf( f'No trade history could be loaded for {brokername}:{acctid}' ) - # unmarshal/load ``pps.toml`` config entries into object form. - pp_objs = {} - for fqsn, entry in pps.items(): + else: + # unmarshal/load ``pps.toml`` config entries into object form. + pp_objs = {} + for fqsn, entry in pps.items(): - # convert clears sub-tables (only in this form - # for toml re-presentation) back into a master table. - clears = entry['clears'] - # clears = {} - # for table in entry['clears']: - # clears.update(table) + # convert clears sub-tables (only in this form + # for toml re-presentation) back into a master table. + clears = entry['clears'] + expiry = entry.get('expiry') + if expiry: + expiry = pendulum.parse(expiry) - pp_objs[fqsn] = Position( - Symbol.from_fqsn(fqsn, info={}), - size=entry['size'], - avg_price=entry['avg_price'], - bsuid=entry['bsuid'], + # clears = {} + # for k, v in clears.items(): + # print((k, v)) + # clears.update(table) - # XXX: super critical, we need to be sure to include - # all pps.toml clears to avoid reusing clears that were - # already included in the current incremental update - # state, since today's records may have already been - # processed! - clears=clears, - ) + pp_objs[fqsn] = Position( + Symbol.from_fqsn(fqsn, info={}), + size=entry['size'], + be_price=entry['be_price'], + expiry=expiry, + bsuid=entry['bsuid'], + + # XXX: super critical, we need to be sure to include + # all pps.toml clears to avoid reusing clears that were + # already included in the current incremental update + # state, since today's records may have already been + # processed! + clears=clears, + ) # update all pp objects from any (new) trade records which # were passed in (aka incremental update case).