Don't pop zero pps from table in `.dump_active()`
In order to avoid double transaction adds/updates and too-early-discard of zero sized pps (like when trades are loaded from a backend broker but were already added to a ledger or `pps.toml` prior) we now **don't** pop such `Position` entries from the `.pps` table in order to keep each position's clears table always in place. This avoids the edge case where an entry was removed too early (due to zero size) but then duplicate trade entries that were in that entrie's clears show up from the backend and are entered into a new entry resulting in an incorrect size in a new entry..We still only push non-net-zero entries to the `pps.toml`. More fixes: - return the updated set of `Positions` from `.lifo_update()`. - return the full table set from `update_pps()`. - use `PpTable.update_from_trans()` more throughout. - always write the `pps.toml` on `open_pps()` exit. - only return table from `load_pps_from_toml()`.pptables
parent
9326379b04
commit
6747831677
87
piker/pp.py
87
piker/pp.py
|
@ -34,7 +34,6 @@ from typing import (
|
|||
Union,
|
||||
)
|
||||
|
||||
from msgspec import Struct
|
||||
import pendulum
|
||||
from pendulum import datetime, now
|
||||
import tomli
|
||||
|
@ -45,6 +44,7 @@ from .brokers import get_brokermod
|
|||
from .clearing._messages import BrokerdPosition, Status
|
||||
from .data._source import Symbol
|
||||
from .log import get_logger
|
||||
from .data.types import Struct
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
@ -314,6 +314,8 @@ class PpTable(Struct):
|
|||
|
||||
pps = self.pps
|
||||
|
||||
updated: dict[str, Position] = {}
|
||||
|
||||
# lifo update all pps from records
|
||||
for tid, r in trans.items():
|
||||
|
||||
|
@ -358,7 +360,9 @@ class PpTable(Struct):
|
|||
# track clearing data
|
||||
pp.update(r)
|
||||
|
||||
return pps
|
||||
updated[r.bsuid] = pp
|
||||
|
||||
return updated
|
||||
|
||||
def dump_active(
|
||||
self,
|
||||
|
@ -393,16 +397,23 @@ class PpTable(Struct):
|
|||
pp.minimize_clears()
|
||||
|
||||
if (
|
||||
# "net-zero" is a "closed" position
|
||||
pp.size == 0
|
||||
|
||||
# drop time-expired positions (normally derivatives)
|
||||
# time-expired pps (normally derivatives) are "closed"
|
||||
or (pp.expiry and pp.expiry < now())
|
||||
):
|
||||
# if expired the position is closed
|
||||
# for expired cases
|
||||
pp.size = 0
|
||||
|
||||
# position is already closed aka "net zero"
|
||||
closed_pp = pp_objs.pop(bsuid, None)
|
||||
# NOTE: we DO NOT pop the pp here since it can still be
|
||||
# used to check for duplicate clears that may come in as
|
||||
# new transaction from some backend API and need to be
|
||||
# ignored; the closed positions won't be written to the
|
||||
# ``pps.toml`` since ``pp_entries`` above is what's
|
||||
# written.
|
||||
# closed_pp = pp_objs.pop(bsuid, None)
|
||||
closed_pp = pp_objs.get(bsuid)
|
||||
if closed_pp:
|
||||
closed_pp_objs[bsuid] = closed_pp
|
||||
|
||||
|
@ -440,7 +451,9 @@ def update_pps(
|
|||
|
||||
'''
|
||||
pps: dict[str, Position] = pps or {}
|
||||
return PpTable(pps).update_from_trans(records)
|
||||
table = PpTable(pps)
|
||||
table.update_from_trans(records)
|
||||
return table.pps
|
||||
|
||||
|
||||
def load_trans_from_ledger(
|
||||
|
@ -629,7 +642,7 @@ def load_pps_from_toml(
|
|||
# does a full refresh of pps from the available ledger.
|
||||
update_from_ledger: bool = False,
|
||||
|
||||
) -> tuple[dict, dict[str, Position]]:
|
||||
) -> tuple[PpTable, dict[str, str]]:
|
||||
'''
|
||||
Load and marshal to objects all pps from either an existing
|
||||
``pps.toml`` config, or from scratch from a ledger file when
|
||||
|
@ -646,9 +659,7 @@ def load_pps_from_toml(
|
|||
brokername,
|
||||
acctid,
|
||||
)
|
||||
# TODO: just call `.update_from_trans()`?
|
||||
ledger_pp_objs = update_pps(trans)
|
||||
pp_objs.update(ledger_pp_objs)
|
||||
table.update_from_trans(trans)
|
||||
|
||||
# Reload symbol specific ledger entries if requested by the
|
||||
# caller **AND** none exist in the current pps state table.
|
||||
|
@ -662,15 +673,14 @@ def load_pps_from_toml(
|
|||
acctid,
|
||||
filter_by=reload_records,
|
||||
)
|
||||
ledger_pp_objs = update_pps(trans)
|
||||
pp_objs.update(ledger_pp_objs)
|
||||
table.update_from_trans(trans)
|
||||
|
||||
if not pp_objs:
|
||||
if not table.pps:
|
||||
log.warning(
|
||||
f'No `pps.toml` values could be loaded {brokername}:{acctid}'
|
||||
)
|
||||
|
||||
return table, table.conf, table.pps
|
||||
return table, table.conf
|
||||
|
||||
|
||||
@cm
|
||||
|
@ -687,6 +697,7 @@ def open_pps(
|
|||
conf, path = config.load('pps')
|
||||
brokersection = conf.setdefault(brokername, {})
|
||||
pps = brokersection.setdefault(acctid, {})
|
||||
|
||||
pp_objs = {}
|
||||
table = PpTable(pp_objs, conf=conf)
|
||||
|
||||
|
@ -747,32 +758,33 @@ def open_pps(
|
|||
clears=clears,
|
||||
)
|
||||
|
||||
orig = pp_objs.copy()
|
||||
# orig = pp_objs.copy()
|
||||
try:
|
||||
yield table
|
||||
finally:
|
||||
if orig != pp_objs:
|
||||
# breakpoint()
|
||||
# if orig != table.pps:
|
||||
|
||||
# TODO: show diff output?
|
||||
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
|
||||
print(f'Updating ``pps.toml`` for {path}:\n')
|
||||
# TODO: show diff output?
|
||||
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
|
||||
print(f'Updating ``pps.toml`` for {path}:\n')
|
||||
|
||||
pp_entries, closed_pp_objs = table.dump_active(brokername)
|
||||
conf[brokername][acctid] = pp_entries
|
||||
pp_entries, closed_pp_objs = table.dump_active(brokername)
|
||||
conf[brokername][acctid] = pp_entries
|
||||
|
||||
# TODO: why tf haven't they already done this for inline
|
||||
# tables smh..
|
||||
enc = PpsEncoder(preserve=True)
|
||||
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
|
||||
enc.dump_funcs[
|
||||
toml.decoder.InlineTableDict
|
||||
] = enc.dump_inline_table
|
||||
# TODO: why tf haven't they already done this for inline
|
||||
# tables smh..
|
||||
enc = PpsEncoder(preserve=True)
|
||||
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
|
||||
enc.dump_funcs[
|
||||
toml.decoder.InlineTableDict
|
||||
] = enc.dump_inline_table
|
||||
|
||||
config.write(
|
||||
conf,
|
||||
'pps',
|
||||
encoder=enc,
|
||||
)
|
||||
config.write(
|
||||
conf,
|
||||
'pps',
|
||||
encoder=enc,
|
||||
)
|
||||
|
||||
|
||||
def update_pps_conf(
|
||||
|
@ -803,10 +815,7 @@ def update_pps_conf(
|
|||
for tid, r in trade_records.items():
|
||||
ledger_reload[r.bsuid] = r.fqsn
|
||||
|
||||
# this maps `.bsuid` values to positions
|
||||
pp_objs: dict[Union[str, int], Position]
|
||||
|
||||
table, conf, pp_objs = load_pps_from_toml(
|
||||
table, conf = load_pps_from_toml(
|
||||
brokername,
|
||||
acctid,
|
||||
reload_records=ledger_reload,
|
||||
|
@ -817,7 +826,9 @@ def update_pps_conf(
|
|||
if trade_records:
|
||||
table.update_from_trans(trade_records)
|
||||
|
||||
# this maps `.bsuid` values to positions
|
||||
pp_entries, closed_pp_objs = table.dump_active(brokername)
|
||||
pp_objs: dict[Union[str, int], Position] = table.pps
|
||||
|
||||
conf[brokername][acctid] = pp_entries
|
||||
|
||||
|
|
Loading…
Reference in New Issue