Rework "breakeven" price as "price-per-uni": ppu

The original implementation of `.calc_be_price()` wasn't correct since
the real so called "price per unit" (ppu), is actually defined by
a recurrence relation (which is why the original state-updated
`.lifo_update()` approach worked well) and requires the previous ppu to
be weighted by the new accumulated position size when considering a new
clear event. The ppu is the price that above or below which the trader
takes a win or loss on transacting one unit of the trading asset and
thus it is the true "break even price" that determines making or losing
money per fill. This patches fixes the implementation to use trailing
windows of the accumulated size and ppu to compute the next ppu value
for any new clear event as well as handle rare cases where the
"direction" changes polarity (eg. long to short in a single order). The
new method is `Position.calc_ppu()` and further details of the relation
can be seen in the doc strings.

This patch also includes a wack-ton of clean ups and removals in an
effort to refine position management api for easier use in new backends:

- drop `updaate_pps_conf()`, `load_pps_from_toml()` and rename
  `load_trands_from_ledger()` -> `load_pps_from_ledger()`.
- extend `PpTable` to have a `.to_toml()` method which returns the
  active set of positions ready to be serialized to the `pps.toml` file
  which is collects from calling,
- `PpTable.dump_active()` which now returns double dicts of the
  open/closed pp object maps.
- make `Position.minimize_clears()` now iterate the clears table in
  chronological order (instead of reverse) and only drop fills prior
  to any zero-size state (the old reversed way can result incorrect
  history-size-retracement in cases where a position is lessened but
  not completely exited).
- drop `Position.add_clear()` and instead just manually add entries
  inside `.update_from_trans()` and also add a `accum_size` and `ppu`
  field to ever entry thus creating a position "history" sequence of
  the ppu and accum size for every position and prepares for being
  and to show "position lifetimes" in the UI.
- move fqsn getting into `Position.to_pretoml()`.
ppu_history
Tyler Goodlet 2022-07-26 11:27:38 -04:00
parent 5520e9ef21
commit ddffaa952d
1 changed files with 207 additions and 255 deletions

View File

@ -20,9 +20,8 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends) (looking at you `ib` and dirt-bird friends)
''' '''
from collections import deque
from contextlib import contextmanager as cm from contextlib import contextmanager as cm
# from pprint import pformat from pprint import pformat
import os import os
from os import path from os import path
from math import copysign from math import copysign
@ -149,7 +148,7 @@ class Position(Struct):
for f in self.__struct_fields__ for f in self.__struct_fields__
} }
def to_pretoml(self) -> dict: def to_pretoml(self) -> tuple[str, dict]:
''' '''
Prep this position's data contents for export to toml including Prep this position's data contents for export to toml including
re-structuring of the ``.clears`` table to an array of re-structuring of the ``.clears`` table to an array of
@ -160,6 +159,13 @@ class Position(Struct):
clears = d.pop('clears') clears = d.pop('clears')
expiry = d.pop('expiry') expiry = d.pop('expiry')
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = d.pop('symbol')
fqsn = s.front_fqsn()
size = d.pop('size') size = d.pop('size')
be_price = d.pop('be_price') be_price = d.pop('be_price')
d['size'], d['be_price'] = self.audit_sizing(size, be_price) d['size'], d['be_price'] = self.audit_sizing(size, be_price)
@ -181,7 +187,7 @@ class Position(Struct):
d['clears'] = toml_clears_list d['clears'] = toml_clears_list
return d return fqsn, d
def audit_sizing( def audit_sizing(
self, self,
@ -199,7 +205,7 @@ class Position(Struct):
size = size or self.size size = size or self.size
be_price = be_price or self.be_price be_price = be_price or self.be_price
csize = self.calc_size() csize = self.calc_size()
cbe_price = self.calc_be_price() cbe_price = self.calc_ppu()
if size != csize: if size != csize:
log.warning(f'size != calculated size: {size} != {csize}') log.warning(f'size != calculated size: {size} != {csize}')
@ -246,18 +252,6 @@ class Position(Struct):
''' '''
return self.be_price * self.size return self.be_price * self.size
def add_clear(
self,
t: Transaction,
) -> None:
self.clears[t.tid] = {
'cost': t.cost,
'price': t.price,
'size': t.size,
'dt': str(t.dt),
}
# TODO: idea: "real LIFO" dynamic positioning. # TODO: idea: "real LIFO" dynamic positioning.
# - when a trade takes place where the pnl for # - when a trade takes place where the pnl for
# the (set of) trade(s) is below the breakeven price # the (set of) trade(s) is below the breakeven price
@ -269,7 +263,7 @@ class Position(Struct):
# def lifo_price() -> float: # def lifo_price() -> float:
# ... # ...
def calc_be_price( def calc_ppu(
self, self,
# include transaction cost in breakeven price # include transaction cost in breakeven price
# and presume the worst case of the same cost # and presume the worst case of the same cost
@ -278,60 +272,105 @@ class Position(Struct):
cost_scalar: float = 2, cost_scalar: float = 2,
) -> float: ) -> float:
'''
Compute the "price-per-unit" price for the given non-zero sized
rolling position.
size: float = 0 The recurrence relation which computes this (exponential) mean
cb_tot_size: float = 0 per new clear which **increases** the accumulative postiion size
cost_basis: float = 0 is:
be_price: float = 0
ppu[-1] = (
ppu[-2] * accum_size[-2]
+
ppu[-1] * size
) / accum_size[-1]
where `cost_basis` for the current step is simply the price
* size of the most recent clearing transaction.
'''
asize_h: list[float] = [] # historical accumulative size
ppu_h: list[float] = [] # historical price-per-unit
clears = list(self.clears.items())
for i, (tid, entry) in enumerate(clears):
for tid, entry in self.clears.items():
clear_size = entry['size'] clear_size = entry['size']
clear_price = entry['price'] clear_price = entry['price']
new_size = size + clear_size
# old size minus the new size gives us size diff with last_accum_size = asize_h[-1] if asize_h else 0
# +ve -> increase in pp size accum_size = last_accum_size + clear_size
# -ve -> decrease in pp size accum_sign = copysign(1, accum_size)
size_diff = abs(new_size) - abs(size)
if new_size == 0: sign_change: bool = False
cost_basis = 0
cb_tot_size = 0 if accum_size == 0:
be_price = 0 ppu_h.append(0)
asize_h.append(0)
continue
# test if the pp somehow went "passed" a net zero size state
# resulting in a change of the "sign" of the size (+ve for
# long, -ve for short).
sign_change = (
copysign(1, last_accum_size) + accum_sign == 0
and last_accum_size != 0
)
# since we passed the net-zero-size state the new size
# after sum should be the remaining size the new
# "direction" (aka, long vs. short) for this clear.
if sign_change:
clear_size = accum_size
abs_diff = abs(accum_size)
asize_h.append(0)
ppu_h.append(0)
else:
# old size minus the new size gives us size diff with
# +ve -> increase in pp size
# -ve -> decrease in pp size
abs_diff = abs(accum_size) - abs(last_accum_size)
# XXX: LIFO breakeven price update. only an increaze in size # XXX: LIFO breakeven price update. only an increaze in size
# of the position contributes the breakeven price, # of the position contributes the breakeven price,
# a decrease does not (i.e. the position is being made # a decrease does not (i.e. the position is being made
# smaller). # smaller).
elif size_diff > 0: # abs_clear_size = abs(clear_size)
abs_new_size = abs(accum_size)
cost_basis += ( if abs_diff > 0:
# weighted price per unit of
cost_basis = (
# cost basis for this clear
clear_price * abs(clear_size) clear_price * abs(clear_size)
+ +
# transaction cost # transaction cost
(copysign(1, new_size) accum_sign * cost_scalar * entry['cost']
*
cost_scalar
*
entry['cost'])
) )
cb_tot_size += abs(clear_size)
be_price = cost_basis / cb_tot_size
size = new_size if asize_h:
size_last = abs(asize_h[-1])
cb_last = ppu_h[-1] * size_last
ppu = (cost_basis + cb_last) / abs_new_size
# print( else:
# f'cb: {cost_basis}\n' ppu = cost_basis / abs_new_size
# f'size: {size}\n'
# f'clear_size: {clear_size}\n'
# f'clear_price: {clear_price}\n\n'
# f'cb_tot_size: {cb_tot_size}\n' ppu_h.append(ppu)
# f'be_price: {be_price}\n\n' asize_h.append(accum_size)
# )
return be_price else:
# on "exit" clears from a given direction,
# only the size changes not the price-per-unit
# need to be updated since the ppu remains constant
# and gets weighted by the new size.
asize_h.append(accum_size)
ppu_h.append(ppu_h[-1])
return ppu_h[-1] if ppu_h else 0
def calc_size(self) -> float: def calc_size(self) -> float:
size: float = 0 size: float = 0
@ -349,17 +388,21 @@ class Position(Struct):
unecessary history irrelevant to the current pp state. unecessary history irrelevant to the current pp state.
''' '''
size: float = self.size size: float = 0
clears_since_zero: deque[tuple(str, dict)] = deque() clears_since_zero: list[tuple(str, dict)] = []
# scan for the last "net zero" position by # TODO: we might just want to always do this when iterating
# iterating clears in reverse. # a ledger? keep a state of the last net-zero and only do the
for tid, clear in reversed(self.clears.items()): # full iterate when no state was stashed?
size -= clear['size']
clears_since_zero.appendleft((tid, clear)) # scan for the last "net zero" position by iterating
# transactions until the next net-zero size, rinse, repeat.
for tid, clear in self.clears.items():
size += clear['size']
clears_since_zero.append((tid, clear))
if size == 0: if size == 0:
break clears_since_zero.clear()
self.clears = dict(clears_since_zero) self.clears = dict(clears_since_zero)
return self.clears return self.clears
@ -367,6 +410,7 @@ class Position(Struct):
class PpTable(Struct): class PpTable(Struct):
brokername: str
pps: dict[str, Position] pps: dict[str, Position]
conf: Optional[dict] = {} conf: Optional[dict] = {}
@ -378,31 +422,30 @@ class PpTable(Struct):
) -> dict[str, Position]: ) -> dict[str, Position]:
pps = self.pps pps = self.pps
updated: dict[str, Position] = {} updated: dict[str, Position] = {}
# lifo update all pps from records # lifo update all pps from records
for tid, r in trans.items(): for tid, t in trans.items():
pp = pps.setdefault( pp = pps.setdefault(
r.bsuid, t.bsuid,
# if no existing pp, allocate fresh one. # if no existing pp, allocate fresh one.
Position( Position(
Symbol.from_fqsn( Symbol.from_fqsn(
r.fqsn, t.fqsn,
info={}, info={},
), ),
size=0.0, size=0.0,
be_price=0.0, be_price=0.0,
bsuid=r.bsuid, bsuid=t.bsuid,
expiry=r.expiry, expiry=t.expiry,
) )
) )
# don't do updates for ledger records we already have # don't do updates for ledger records we already have
# included in the current pps state. # included in the current pps state.
if r.tid in pp.clears: if t.tid in pp.clears:
# NOTE: likely you'll see repeats of the same # NOTE: likely you'll see repeats of the same
# ``Transaction`` passed in here if/when you are restarting # ``Transaction`` passed in here if/when you are restarting
# a ``brokerd.ib`` where the API will re-report trades from # a ``brokerd.ib`` where the API will re-report trades from
@ -410,22 +453,35 @@ class PpTable(Struct):
# "double count" these in pp calculations. # "double count" these in pp calculations.
continue continue
# track clearing data # update clearing table and populate rolling
pp.add_clear(r) # ppu and accumulative size.
updated[r.bsuid] = pp clear = pp.clears[t.tid] = {
'cost': t.cost,
'price': t.price,
'size': t.size,
'dt': str(t.dt),
}
# TODO: compute these incrementally instead
# of re-looping through each time resulting in O(n**2)
# behaviour..
# compute these **after** adding the entry
# in order to make the recurrence relation math work
# inside ``.calc_size()``.
clear['accum_size'] = pp.calc_size()
clear['ppu'] = pp.calc_ppu()
updated[t.bsuid] = pp
# minimize clears tables and update sizing. # minimize clears tables and update sizing.
for bsuid, pp in updated.items(): for bsuid, pp in updated.items():
pp.minimize_clears()
pp.size, pp.be_price = pp.audit_sizing() pp.size, pp.be_price = pp.audit_sizing()
return updated return updated
def dump_active( def dump_active(
self, self,
brokername: str,
) -> tuple[ ) -> tuple[
dict[str, Any], dict[str, Position],
dict[str, Position] dict[str, Position]
]: ]:
''' '''
@ -435,13 +491,12 @@ class PpTable(Struct):
``Position``s which have recently closed. ``Position``s which have recently closed.
''' '''
# ONLY dict-serialize all active positions; those that are closed
# we don't store in the ``pps.toml``.
# NOTE: newly closed position are also important to report/return # NOTE: newly closed position are also important to report/return
# since a consumer, like an order mode UI ;), might want to react # since a consumer, like an order mode UI ;), might want to react
# based on the closure. # based on the closure (for example removing the breakeven line
pp_active_entries = {} # and clearing the entry from any lists/monitors).
closed_pp_objs: dict[str, Position] = {} closed_pp_objs: dict[str, Position] = {}
open_pp_objs: dict[str, Position] = {}
pp_objs = self.pps pp_objs = self.pps
for bsuid in list(pp_objs): for bsuid in list(pp_objs):
@ -452,7 +507,6 @@ class PpTable(Struct):
# if bsuid == qqqbsuid: # if bsuid == qqqbsuid:
# breakpoint() # breakpoint()
pp.minimize_clears()
size, be_price = pp.audit_sizing() size, be_price = pp.audit_sizing()
if ( if (
@ -471,48 +525,42 @@ class PpTable(Struct):
# ignored; the closed positions won't be written to the # ignored; the closed positions won't be written to the
# ``pps.toml`` since ``pp_active_entries`` above is what's # ``pps.toml`` since ``pp_active_entries`` above is what's
# written. # written.
# closed_pp = pp_objs.pop(bsuid, None) closed_pp_objs[bsuid] = pp
closed_pp = pp_objs.get(bsuid)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp
else: else:
# serialize to pre-toml form open_pp_objs[bsuid] = pp
asdict = pp.to_pretoml()
# TODO: we need to figure out how to have one top level return open_pp_objs, closed_pp_objs
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = asdict.pop('symbol')
fqsn = s.front_fqsn()
log.info(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under def to_toml(
# the broker name.. maybe we need to rethink this? self,
brokerless_key = fqsn.removeprefix(f'{brokername}.') ) -> dict[str, Any]:
pp_active_entries[brokerless_key] = asdict active, closed = self.dump_active()
return pp_active_entries, closed_pp_objs # ONLY dict-serialize all active positions; those that are closed
# we don't store in the ``pps.toml``.
to_toml_dict = {}
for bsuid, pos in active.items():
# keep the minimal amount of clears that make up this
# position since the last net-zero state.
pos.minimize_clears()
# serialize to pre-toml form
fqsn, asdict = pos.to_pretoml()
log.info(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under
# the broker name.. maybe we need to rethink this?
brokerless_key = fqsn.removeprefix(f'{self.brokername}.')
to_toml_dict[brokerless_key] = asdict
return to_toml_dict
def update_pps( def load_pps_from_ledger(
records: dict[str, Transaction],
pps: Optional[dict[str, Position]] = None
) -> dict[str, Position]:
'''
Compile a set of positions from a trades ledger.
'''
pps: dict[str, Position] = pps or {}
table = PpTable(pps)
table.update_from_trans(records)
return table.pps
def load_trans_from_ledger(
brokername: str, brokername: str,
acctname: str, acctname: str,
@ -520,35 +568,40 @@ def load_trans_from_ledger(
# post normalization filter on ledger entries to be processed # post normalization filter on ledger entries to be processed
filter_by: Optional[list[dict]] = None, filter_by: Optional[list[dict]] = None,
) -> dict[str, Position]: ) -> tuple[
dict[str, Transaction],
dict[str, Position],
]:
''' '''
Open a ledger file by broker name and account and read in and Open a ledger file by broker name and account and read in and
process any trade records into our normalized ``Transaction`` process any trade records into our normalized ``Transaction`` form
form and then pass these into the position processing routine and then update the equivalent ``Pptable`` and deliver the two
and deliver the two dict-sets of the active and closed pps. bsuid-mapped dict-sets of the transactions and pps.
''' '''
with open_trade_ledger( with (
brokername, open_trade_ledger(brokername, acctname) as ledger,
acctname, open_pps(brokername, acctname) as table,
) as ledger: ):
if not ledger: if not ledger:
# null case, no ledger file with content # null case, no ledger file with content
return {} return {}
brokermod = get_brokermod(brokername) mod = get_brokermod(brokername)
src_records: dict[str, Transaction] = brokermod.norm_trade_records(ledger) src_records: dict[str, Transaction] = mod.norm_trade_records(ledger)
if filter_by: if filter_by:
records = {} records = {}
bsuids = set(filter_by) bsuids = set(filter_by)
for tid, r in src_records.items(): for tid, r in src_records.items():
if r.bsuid in bsuids: if r.bsuid in bsuids:
records[tid] = r records[tid] = r
else: else:
records = src_records records = src_records
return records updated = table.update_from_trans(records)
return records, updated
# TODO: instead see if we can hack tomli and tomli-w to do the same: # TODO: instead see if we can hack tomli and tomli-w to do the same:
@ -682,67 +735,6 @@ class PpsEncoder(toml.TomlEncoder):
return (retstr, retdict) return (retstr, retdict)
def load_pps_from_toml(
brokername: str,
acctid: str,
# XXX: there is an edge case here where we may want to either audit
# the retrieved ``pps.toml`` output or reprocess it since there was
# an error on write on the last attempt to update the state file
# even though the ledger *was* updated. For this cases we allow the
# caller to pass in a symbol set they'd like to reload from the
# underlying ledger to be reprocessed in computing pps state.
reload_records: Optional[dict[str, str]] = None,
# XXX: this is "global" update from ledger flag which
# does a full refresh of pps from the available ledger.
update_from_ledger: bool = False,
) -> tuple[PpTable, dict[str, str]]:
'''
Load and marshal to objects all pps from either an existing
``pps.toml`` config, or from scratch from a ledger file when
none yet exists.
'''
with open_pps(
brokername,
acctid,
write_on_exit=False,
) as table:
pp_objs = table.pps
# no pps entry yet for this broker/account so parse any available
# ledgers to build a brand new pps state.
if not pp_objs or update_from_ledger:
trans = load_trans_from_ledger(
brokername,
acctid,
)
table.update_from_trans(trans)
# Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table.
elif (
pp_objs and reload_records
):
# no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state.
trans = load_trans_from_ledger(
brokername,
acctid,
filter_by=reload_records,
)
table.update_from_trans(trans)
if not table.pps:
log.warning(
f'No `pps.toml` values could be loaded {brokername}:{acctid}'
)
return table, table.conf
@cm @cm
def open_pps( def open_pps(
brokername: str, brokername: str,
@ -759,8 +751,22 @@ def open_pps(
brokersection = conf.setdefault(brokername, {}) brokersection = conf.setdefault(brokername, {})
pps = brokersection.setdefault(acctid, {}) pps = brokersection.setdefault(acctid, {})
# TODO: ideally we can pass in an existing
# pps state to this right? such that we
# don't have to do a ledger reload all the
# time.. a couple ideas I can think of,
# - mirror this in some client side actor which
# does the actual ledger updates (say the paper
# engine proc if we decide to always spawn it?),
# - do diffs against updates from the ledger writer
# actor and the in-mem state here?
pp_objs = {} pp_objs = {}
table = PpTable(pp_objs, conf=conf) table = PpTable(
brokername,
pp_objs,
conf=conf,
)
# unmarshal/load ``pps.toml`` config entries into object form # unmarshal/load ``pps.toml`` config entries into object form
# and update `PpTable` obj entries. # and update `PpTable` obj entries.
@ -817,7 +823,8 @@ def open_pps(
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ``pps.toml`` for {path}:\n') print(f'Updating ``pps.toml`` for {path}:\n')
pp_entries, closed_pp_objs = table.dump_active(brokername) # active, closed_pp_objs = table.dump_active()
pp_entries = table.to_toml()
conf[brokername][acctid] = pp_entries conf[brokername][acctid] = pp_entries
# TODO: why tf haven't they already done this for inline # TODO: why tf haven't they already done this for inline
@ -835,66 +842,6 @@ def open_pps(
) )
def update_pps_conf(
brokername: str,
acctid: str,
trade_records: Optional[dict[str, Transaction]] = None,
ledger_reload: Optional[dict[str, str]] = None,
) -> tuple[
dict[str, Position],
dict[str, Position],
]:
# TODO: ideally we can pass in an existing
# pps state to this right? such that we
# don't have to do a ledger reload all the
# time.. a couple ideas I can think of,
# - load pps once after backend ledger state
# is loaded and keep maintainend in memory
# inside a with block,
# - mirror this in some client side actor which
# does the actual ledger updates (say the paper
# engine proc if we decide to always spawn it?),
# - do diffs against updates from the ledger writer
# actor and the in-mem state here?
if trade_records and ledger_reload:
for tid, r in trade_records.items():
ledger_reload[r.bsuid] = r.fqsn
table, conf = load_pps_from_toml(
brokername,
acctid,
reload_records=ledger_reload,
)
# update all pp objects from any (new) trade records which
# were passed in (aka incremental update case).
if trade_records:
table.update_from_trans(trade_records)
# this maps `.bsuid` values to positions
pp_entries, closed_pp_objs = table.dump_active(brokername)
pp_objs: dict[Union[str, int], Position] = table.pps
conf[brokername][acctid] = pp_entries
# TODO: why tf haven't they already done this for inline tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[toml.decoder.InlineTableDict] = enc.dump_inline_table
config.write(
conf,
'pps',
encoder=enc,
)
# deliver object form of all pps in table to caller
return pp_objs, closed_pp_objs
if __name__ == '__main__': if __name__ == '__main__':
import sys import sys
@ -903,4 +850,9 @@ if __name__ == '__main__':
args = args[1:] args = args[1:]
for acctid in args: for acctid in args:
broker, name = acctid.split('.') broker, name = acctid.split('.')
update_pps_conf(broker, name) trans, updated_pps = load_pps_from_ledger(broker, name)
print(
f'Processing transactions into pps for {broker}:{acctid}\n'
f'{pformat(trans)}\n\n'
f'{pformat(updated_pps)}'
)