Add a `PpTable` type, give it the update methods

In an effort to begin allowing backends to have more granular control
over position updates, particular in the case where they need to be
reloaded from a trades ledger, this adds a new table API which can
be loaded using `open_pps()`.

- offer an `.update_trans()` method which takes in a `dict` of
  `Transactions` and updates the current table of `Positions` from it.
- add a `.dump_active()` which renders the active pp entries dict in
  a format ready for toml serialization and all closed positions since
  the last update (we might want to not drop these?)

All other module-function apis currently in use should remain working as
before for the moment.
pptables
Tyler Goodlet 2022-07-18 12:23:02 -04:00
parent 09d9a7ea2b
commit 9326379b04
1 changed files with 211 additions and 205 deletions

View File

@ -302,6 +302,134 @@ class Position(Struct):
return self.clears return self.clears
class PpTable(Struct):
pps: dict[str, Position]
conf: Optional[dict] = {}
def update_from_trans(
self,
trans: dict[str, Transaction],
) -> dict[str, Position]:
pps = self.pps
# lifo update all pps from records
for tid, r in trans.items():
pp = pps.setdefault(
r.bsuid,
# if no existing pp, allocate fresh one.
Position(
Symbol.from_fqsn(
r.fqsn,
info={},
),
size=0.0,
be_price=0.0,
bsuid=r.bsuid,
expiry=r.expiry,
)
)
# don't do updates for ledger records we already have
# included in the current pps state.
if r.tid in pp.clears:
# NOTE: likely you'll see repeats of the same
# ``Transaction`` passed in here if/when you are restarting
# a ``brokerd.ib`` where the API will re-report trades from
# the current session, so we need to make sure we don't
# "double count" these in pp calculations.
continue
# lifo style "breakeven" price calc
pp.lifo_update(
r.size,
r.price,
# include transaction cost in breakeven price
# and presume the worst case of the same cost
# to exit this transaction (even though in reality
# it will be dynamic based on exit stratetgy).
cost=2*r.cost,
)
# track clearing data
pp.update(r)
return pps
def dump_active(
self,
brokername: str,
) -> tuple[
dict[str, Any],
dict[str, Position]
]:
'''
Iterate all tabulated positions, render active positions to
a ``dict`` format amenable to serialization (via TOML) and drop
from state (``.pps``) as well as return in a ``dict`` all
``Position``s which have recently closed.
'''
# ONLY dict-serialize all active positions; those that are closed
# we don't store in the ``pps.toml``.
# NOTE: newly closed position are also important to report/return
# since a consumer, like an order mode UI ;), might want to react
# based on the closure.
pp_entries = {}
closed_pp_objs: dict[str, Position] = {}
pp_objs = self.pps
for bsuid in list(pp_objs):
pp = pp_objs[bsuid]
# XXX: debug hook for size mismatches
# if bsuid == 447767096:
# breakpoint()
pp.minimize_clears()
if (
pp.size == 0
# drop time-expired positions (normally derivatives)
or (pp.expiry and pp.expiry < now())
):
# if expired the position is closed
pp.size = 0
# position is already closed aka "net zero"
closed_pp = pp_objs.pop(bsuid, None)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp
else:
# serialize to pre-toml form
asdict = pp.to_pretoml()
if pp.expiry is None:
asdict.pop('expiry', None)
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = asdict.pop('symbol')
fqsn = s.front_fqsn()
log.info(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under
# the broker name.. maybe we need to rethink this?
brokerless_key = fqsn.removeprefix(f'{brokername}.')
pp_entries[brokerless_key] = asdict
return pp_entries, closed_pp_objs
def update_pps( def update_pps(
records: dict[str, Transaction], records: dict[str, Transaction],
pps: Optional[dict[str, Position]] = None pps: Optional[dict[str, Position]] = None
@ -312,55 +440,10 @@ def update_pps(
''' '''
pps: dict[str, Position] = pps or {} pps: dict[str, Position] = pps or {}
return PpTable(pps).update_from_trans(records)
# lifo update all pps from records
for tid, r in records.items():
pp = pps.setdefault(
r.bsuid,
# if no existing pp, allocate fresh one.
Position(
Symbol.from_fqsn(
r.fqsn,
info={},
),
size=0.0,
be_price=0.0,
bsuid=r.bsuid,
expiry=r.expiry,
)
)
# don't do updates for ledger records we already have
# included in the current pps state.
if r.tid in pp.clears:
# NOTE: likely you'll see repeats of the same
# ``Transaction`` passed in here if/when you are restarting
# a ``brokerd.ib`` where the API will re-report trades from
# the current session, so we need to make sure we don't
# "double count" these in pp calculations.
continue
# lifo style "breakeven" price calc
pp.lifo_update(
r.size,
r.price,
# include transaction cost in breakeven price
# and presume the worst case of the same cost
# to exit this transaction (even though in reality
# it will be dynamic based on exit stratetgy).
cost=2*r.cost,
)
# track clearing data
pp.update(r)
return pps
def load_pps_from_ledger( def load_trans_from_ledger(
brokername: str, brokername: str,
acctname: str, acctname: str,
@ -396,82 +479,7 @@ def load_pps_from_ledger(
else: else:
records = src_records records = src_records
return update_pps(records) return records
@cm
def open_pps(
brokername: str,
acctid: str,
) -> dict[str, dict[str, Position]]:
'''
Read out broker-specific position entries from
incremental update file: ``pps.toml``.
'''
conf, path = config.load('pps')
brokersection = conf.setdefault(brokername, {})
pps = brokersection.setdefault(acctid, {})
pp_objs = {}
# unmarshal/load ``pps.toml`` config entries into object form.
for fqsn, entry in pps.items():
bsuid = entry['bsuid']
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our
# ``pps.toml``).
pp = pp_objs.get(bsuid)
if pp:
clears = pp.clears
else:
clears = {}
for clears_table in clears_list:
tid = clears_table.pop('tid')
clears[tid] = clears_table
size = entry['size']
# TODO: an audit system for existing pps entries?
# if not len(clears) == abs(size):
# pp_objs = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=reload_records,
# )
# reason = 'size <-> len(clears) mismatch'
# raise ValueError(
# '`pps.toml` entry is invalid:\n'
# f'{fqsn}\n'
# f'{pformat(entry)}'
# )
expiry = entry.get('expiry')
if expiry:
expiry = pendulum.parse(expiry)
pp_objs[bsuid] = Position(
Symbol.from_fqsn(fqsn, info={}),
size=size,
be_price=entry['be_price'],
expiry=expiry,
bsuid=entry['bsuid'],
# XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were
# already included in the current incremental update
# state, since today's records may have already been
# processed!
clears=clears,
)
yield pp_objs
# TODO: instead see if we can hack tomli and tomli-w to do the same: # TODO: instead see if we can hack tomli and tomli-w to do the same:
@ -627,39 +635,63 @@ def load_pps_from_toml(
``pps.toml`` config, or from scratch from a ledger file when ``pps.toml`` config, or from scratch from a ledger file when
none yet exists. none yet exists.
'''
with open_pps(brokername, acctid) as table:
pp_objs = table.pps
# no pps entry yet for this broker/account so parse any available
# ledgers to build a brand new pps state.
if not pp_objs or update_from_ledger:
trans = load_trans_from_ledger(
brokername,
acctid,
)
# TODO: just call `.update_from_trans()`?
ledger_pp_objs = update_pps(trans)
pp_objs.update(ledger_pp_objs)
# Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table.
elif (
pp_objs and reload_records
):
# no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state.
trans = load_trans_from_ledger(
brokername,
acctid,
filter_by=reload_records,
)
ledger_pp_objs = update_pps(trans)
pp_objs.update(ledger_pp_objs)
if not pp_objs:
log.warning(
f'No `pps.toml` values could be loaded {brokername}:{acctid}'
)
return table, table.conf, table.pps
@cm
def open_pps(
brokername: str,
acctid: str,
) -> dict[str, dict[str, Position]]:
'''
Read out broker-specific position entries from
incremental update file: ``pps.toml``.
''' '''
conf, path = config.load('pps') conf, path = config.load('pps')
brokersection = conf.setdefault(brokername, {}) brokersection = conf.setdefault(brokername, {})
pps = brokersection.setdefault(acctid, {}) pps = brokersection.setdefault(acctid, {})
pp_objs = {} pp_objs = {}
table = PpTable(pp_objs, conf=conf)
# no pps entry yet for this broker/account so parse any available # unmarshal/load ``pps.toml`` config entries into object form
# ledgers to build a brand new pps state. # and update `PpTable` obj entries.
if not pps or update_from_ledger:
pp_objs = load_pps_from_ledger(
brokername,
acctid,
)
# Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table.
elif (
pps and reload_records
):
# no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state.
pp_objs = load_pps_from_ledger(
brokername,
acctid,
filter_by=reload_records,
)
if not pps:
log.warning(
f'No `pps.toml` positions could be loaded {brokername}:{acctid}'
)
# unmarshal/load ``pps.toml`` config entries into object form.
for fqsn, entry in pps.items(): for fqsn, entry in pps.items():
bsuid = entry['bsuid'] bsuid = entry['bsuid']
@ -715,7 +747,32 @@ def load_pps_from_toml(
clears=clears, clears=clears,
) )
return conf, pp_objs orig = pp_objs.copy()
try:
yield table
finally:
if orig != pp_objs:
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ``pps.toml`` for {path}:\n')
pp_entries, closed_pp_objs = table.dump_active(brokername)
conf[brokername][acctid] = pp_entries
# TODO: why tf haven't they already done this for inline
# tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[
toml.decoder.InlineTableDict
] = enc.dump_inline_table
config.write(
conf,
'pps',
encoder=enc,
)
def update_pps_conf( def update_pps_conf(
@ -749,7 +806,7 @@ def update_pps_conf(
# this maps `.bsuid` values to positions # this maps `.bsuid` values to positions
pp_objs: dict[Union[str, int], Position] pp_objs: dict[Union[str, int], Position]
conf, pp_objs = load_pps_from_toml( table, conf, pp_objs = load_pps_from_toml(
brokername, brokername,
acctid, acctid,
reload_records=ledger_reload, reload_records=ledger_reload,
@ -758,60 +815,9 @@ def update_pps_conf(
# update all pp objects from any (new) trade records which # update all pp objects from any (new) trade records which
# were passed in (aka incremental update case). # were passed in (aka incremental update case).
if trade_records: if trade_records:
pp_objs = update_pps( table.update_from_trans(trade_records)
trade_records,
pps=pp_objs,
)
pp_entries = {} # dict-serialize all active pps pp_entries, closed_pp_objs = table.dump_active(brokername)
# NOTE: newly closed position are also important to report/return
# since a consumer, like an order mode UI ;), might want to react
# based on the closure.
closed_pp_objs: dict[str, Position] = {}
for bsuid in list(pp_objs):
pp = pp_objs[bsuid]
# XXX: debug hook for size mismatches
# if bsuid == 447767096:
# breakpoint()
pp.minimize_clears()
if (
pp.size == 0
# drop time-expired positions (normally derivatives)
or (pp.expiry and pp.expiry < now())
):
# if expired the position is closed
pp.size = 0
# position is already closed aka "net zero"
closed_pp = pp_objs.pop(bsuid, None)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp
else:
# serialize to pre-toml form
asdict = pp.to_pretoml()
if pp.expiry is None:
asdict.pop('expiry', None)
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = asdict.pop('symbol')
fqsn = s.front_fqsn()
log.info(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under
# the broker name.. maybe we need to rethink this?
brokerless_key = fqsn.removeprefix(f'{brokername}.')
pp_entries[brokerless_key] = asdict
conf[brokername][acctid] = pp_entries conf[brokername][acctid] = pp_entries